Kafka 커맨드 라인 명령어

망7H·2021년 4월 27일
1

이 포스팅은 카프카 관련 애플리케이션 제작 과정에서 커맨드 라인 명령어를 반복적으로 치게되면서 한번 정리하면 좋겠다 싶어서 정리하였습니다.
(지속적으로 내용은 추가할 예정입니다.)
※ 포스팅에 쓰인 my-kafka는 EC2 인스턴스 발급시 생성되는 ip를 의미합니다.

1. 토픽

1) 토픽 생성

(1) 기본 토픽 생성

bin/kafka-topics.sh \
--create \
--bootstrap-server my-kafka:9092 \
--topic [토픽 이름]

(2) 옵션을 추가한 토픽 생성

bin/kafka-topics.sh \
--create \
--bootstrap-server my-kafka:9092 \
--topic [토픽 이름] \
--partitions [생성할 파티션 수] \
--replication-factor [브로커 복제 계수] \
--config retention.ms=[토픽의 데이터를 유지할 시간 (단위: ms)]

2) 토픽 리스트 조회

bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--list

3) 특정 토픽 상세 조회

토픽별 토픽명, 존재하는 파티션 번호, 리더 브로커 번호, 복제 계수, ISR 등을 알 수 있다.

bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--topic [조회할 토픽 이름] \
--describe

4) 토픽 수정

(1) 토픽 수정

아래 명령을 보면 --alter 이하의 설정들을 모두 개별적으로 수정할 수 있다.

bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--alter \
--partitions [변경할 파티션 수]

(2) --add-config를 사용한 토픽 설정 수정

토픽의 설정 정보를 수정할 때, --add-config 옵션을 사용하면 수정하려고 넣은 옵션이
기존에 없는 옵션이면 신규로 추가하고, 기존에 존재하는 옵션이면 값을 변경한다.

bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--alter \
--add-config retention.ms=[토픽의 데이터를 유지할 시간 (단위: ms)]

(3) --delete-config를 사용한 토픽 설정 삭제

bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--alter \
--delete-config retention.ms

5) 토픽의 레코드 삭제

(1) json 파일을 활용한 레코드 삭제

vi delete-topic.json 과 같은 명령으로 json 파일을 생성한다. (파일명은 자유롭게)

-- delete-topic.json
{
  "partitions": [
    {
      "topic": "[삭제할 레코드가 있는 토픽명]",
      "partition": [삭제할 레코드가 있는 파티션 번호],
      "offset": [처음부터 삭제할 offset 번호]
    }
  ],
  "version": 1 
}
bin/kafka-delete-records.sh \
--bootstrap-server my-kafka:9092 \
--offset-json-file delete-topic.json

2. 프로듀서

1) 레코드 produce

(1) key가 없고 value만 있는 메시지 produce

bin/kafka-console-producer.sh \
--boostrap-server my-kafka:9092 \
--topic [레코드를 저장할 토픽명]

(2) key와 value가 있는 메시지 produce

※ 메시지를 입력할때, 콜론을 사용하여 key:value 형태로 입력하도록 하려면 아래와 같이 명령어를 작성할 수 있다.

bin/kafka-console-producer.sh \
--boostrap-server my-kafka:9092 \
--topic [레코드를 저장할 토픽명] \
--property "parse.key=true" \
--property "key.separator=:"

3. 컨슈머

1) 레코드 consume

(1) key 없이 value만 보여주는 consume

--from-beginning 옵션은 토픽에 저장된 가장 첫 데이터부터 읽어온다.

bin/kafka-console-consumer.sh \
--bootstrap-server my-kafka:9092 \
--topic [레코드를 읽어 올 토픽명] \
--from-beginning

(2) key와 value를 보여주는 consume

--property print.key=true를 사용하여 메시지 키를 노출 시킬 수 있다.
--property key.separator="-"를 사용하여 key와 value를 하이픈(-)으로 분리해서 보여줄 수 있다.
--group [그룹명]을 사용하여 이 컨슈머의 컨슈머 그룹을 지정 또는 생성할 수 있다.

bin/kafka-console-consumer.sh \
--bootstrap-server my-kafka:9092 \
--topic [레코드를 읽어 올 토픽명] \
--property print.key=true \
--property key.separator="-" \
--group [이 컨슈머가 속할 그룹명 (지정한 그룹명이 없는 경우 새로 생성됨)] \
--from-beginning

2) 컨슈머 그룹

(1) 컨슈머 그룹에 속하는 컨슈머 리스트 조회

bin/kafka-consumer-groups.sh \
--bootstrap-server my-kafka:9092 \
--list

(2) 컨슈머 그룹이 어떤 토픽의 데이터를 처리하는지 조회

컨슈머 그룹명, 컨슘하고 있는 토픽명, 컨슘하고 있는 토픽의 파티션 번호,
컨슈머 그룹이 가져간 토픽의 파티션의 최신 오프셋,
컨슈머 그룹의 컨슈머가 컨슘한 오프셋,
컨슈머 그룹이 토픽의 파티션에 있는 데이터를 가져가는데 발생하는 지연. 랙(LAG),
컨슈머 아이디 등을 알 수 있다.

bin/kafka-consumer-groups.sh \
--bootstrap-server my-kafka:9092 \
--group [존재하는 컨슈머 그룹명]
--describe
profile
망한 개발자의 개발 기록입니다. 저를 타산지석으로 삼으시고 공부하세요.

0개의 댓글