wget https://downloads.apache.org/kafka/3.3.2/kafka-3.3.2-src.tgz
tar xvf kafka_2.12-3.3.2.tgz
cd kafka_2.12-3.3.2
export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m" // 카프카 힙 메모리 조정
echo ${KAFKA_HEAP_OPTS} // 설정값 확인
cd ~/kafka-3.3.2-src/config
vi server.properties
server.properties
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
jps // 자바 프로세스 조회
주키퍼인 QuorumPeerMain가 떠있다.
bin/kafka-server-start.sh -daemon config/server.properties
jps
tail -f logs/* // 카프카 로그 조회
정상적으로 실행되었다.
curl https://archive.apache.org/dist/kafka/3.3.2/kafka_2.13-3.3.2.tgz --output kafka.tgz
cd kafka_2.13-3.3.2/bin
// 1. 테스트 토픽 생성
./kafka-topics.sh --create --bootstrap-server {aws ec2 public ip}:9092 --replication-factor 1 --partitions 3 --topic test
// 2. 토픽에 데이터 넣기
./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test
// 3. 들어간 데이터 "모두" 확인하기
./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --from-beginning
// 4. 그룹별로 데이터 확인하기
./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test -group testgroup --from-beginning
// 5. 생성된 그룹 확인하기
./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --list
// 6. 그룹 세부정보 확인하기
./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --describe
// 7-1. 그룹 오프셋 되돌리기
./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --topic test --reset-offsets --to-earliest --execute
// 7-2. 그룹 오프셋의 특정 파티션만 되돌리기 --topic {토픽:파티션} --to-offset {되돌릴 위치}
./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --topic test:1 --reset-offsets --to-offset 10 --execute
1~3. 토픽 생성 / 프로듀서에서 데이터 입력 / 컨슈머에서 데이터 조회
// 1. 테스트 토픽 생성
./kafka-topics.sh --create --bootstrap-server {aws ec2 public ip}:9092 --replication-factor 1 --partitions 3 --topic test
// 2. 토픽에 데이터 넣기
./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test
// 3. 들어간 데이터 확인하기
./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --from-beginning
프로듀서에서 만든 데이터가 컨슈머에서 확인되는 것을 볼 수 있다.
4. 그룹별로 데이터 확인하기
// 4. 그룹별로 데이터 확인하기
./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test -group testgroup --from-beginning
테스트 그룹을 지정한 경우 해당 그룹이 어디까지 읽었는지 카프카에 기록된다. 따라서 다시 동일한 명령어를 수행하면 이전에 읽은 내용 다음부터 출력된다.
5. 생성된 그룹 리스트를 확인한다.
// 5. 생성된 그룹 확인하기
./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --list
6. 그룹 세부정보 확인하기
// 6. 그룹 세부정보 확인하기
./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --describe
7. 그룹의 오프셋 데이터 리셋하기
// 7-1. 그룹 오프셋 되돌리기
./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --topic test --reset-offsets --to-earliest --execute
// 7-2. 그룹 오프셋의 특정 파티션만 되돌리기 --topic {토픽:파티션} --to-offset {되돌릴 위치}
./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --topic test:1 --reset-offsets --to-offset 10 --execute
❗️카프카 브로커 버전과 dependency 버전이 일치해야 한다!!!❗️
build.gradle
implementation 'org.apache.kafka:kafka-clients:3.3.2'
// 생성자
ProducerRecord(String topic, V value)
ProducerRecord(String topic, K key, V value)
ProducerRecord(String topic, Integer partition, K key, V value)
// 예시
ProducerRecord<K, V> record = new ProducerRecord<>(TOPIC_NAME, Integer.toString(index), data)
서비스 요구사항에 맞는 생성자를 사용하면 된다.
acks = 0 : 속도 빠름/ 유실 가능성 높음
acks = 1 (default) : 속도 보통 / 유실 가능성 있음
acks = all or -1 : 속도 느림 / 유실 가능성 없음
ENABLE_AUTO_COMMIT
commitSync
commitAsync
...
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
...
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value()); // 작업 수행
}
consumer.commitSync();
}
클라이언트 셧다운시 중복/유실 방지
{
...
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
consumer.commitSync();
}
} catch (WakeupException e) {
System.out.println("WakeupException");
} finally {
consumer.commitSync();
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}