Kafka tutorial을 위해 책에서는 EC2를 이용하였으나, Docker를 이용해서 진행하고자 한다.
# amazon linux install
docker pull amazonlinux
# port 9092, 2181 열어서 실행
docker run -it -p 9092:9092 -p 2181:2181 --name kafka_tutorial amazonlinux /bin/bash
# openjdk 1.8
yum install -y java-1.8.0-openjdk-devel.x86_64
# install 확인
java -version
아래와 같이 나오면 성공
openjdk version "1.8.0_342"
OpenJDK Runtime Environment (build 1.8.0_342-b07)
OpenJDK 64-Bit Server VM (build 25.342-b07, mixed mode)
최근 버전을 좋아하므로... 최근 버전으로 진행해보고자 한다.(물론 production에서는 stable 버전을 사용해야함)
# wget, tar install
yum install -y wget tar
# kafka 3.3.1, scalar 2.12 version
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
# unzip
tar xvf kafka_2.12-3.3.1.tgz
kafka 브로커는 레코드의 내용은 페이지 캐시로 시스템 메모리를 사용하고, 나머지 객체들은 힙 메모리에 저장하여 사용한다는 특징이 있다.
그래서 kafka 브로커의 경우 힙 메모리를 6GB 이상 설정하지 않는 것을 권장한다.
책에서는 설정 값을 400m로 설정 하던데, 도커이니 한번 2GB를 줘서 실행해보고자 한다.
# Xmx : Java 힙의 최대 크기를 지정하는 것
# Xms : Java 힙의 최초 크기를 지정하는 것
export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"
config/server.properties에서 카프카 브로커가 클러스터 운영에 필요한 옵션들을 지정할 수 있다. 이번 튜토리얼에서 advertised.listeners=PLAINTEXT://172.19.0.2:9092로 수정(docker container ip)
config/server.properties에 있는 대표적인 옵션은 아래와 같다.
broker.idlistenersadvertised.listnerslistener.security.protocol.mapnum.network.threadsnum.io.threadslog.dirsnum.partitionslog.retention.hourslog.retention.minutes 또는 log.retention.ms로도 사용할 수 있다. log.retention.ms값을 설정하여 운영하는 것을 추천하며, log.retention.ms=-1이면 영원히 삭제되지 않는다.log.segment.byteslog.retention.check.interval.mszookeeper.connectzookeeper.connection.timeout.ms카프카를 실행하기 위해서는 주키퍼가 반드시 필요하다. 주키퍼는 클러스터 설정 리더 정보, 컨트롤러 정보를 담고 있으며, 상용 환경에서는 안전하게 3대 이상의 서버로 구성하여 사용하여야 한다.
여기서는 1대만 실행할 것이며, 이를 Quick-and-dirty single-node라고 부른다고 한다.
# start zookeeper (daemon to run background, if you want to run foreground remove that argument)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# check zookeeper run normally
jps -vm
# start kafka broker (daemon to run background, if you want to run foreground
bin/kafka-server-start.sh -daemon config/server.properties
# check kafka run normally
jps -m
# show broker log
tail -f logs/server.log
윈도우에서는 wsl로 실행
# kafka download
curl https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz --output kafka.tgz
# unzip
tar -xvf kafka.tgz
cd kafka_2.12-3.3.1/
docker과 wsl 네트워크 통신이 안 되는 것 같아서 아래와 같이 진행(여기서 삽질 엄청 함...ㅎㅎㅎ)
# network 생성 docker network create --driver bridge test-network # container network 연결 docker network connect test-network kafka_tutorial # network 연결된 docker container 생성 docker run -itd --name kafka_connect_test_ubuntu --net=test-network ubuntu /bin/bash # exec docker exec -it kafka_connect_test_ubuntu /bin/bash # kafka download curl https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz --output kafka.tgz # unzip tar -xvf kafka.tgz cd kafka_2.12-3.3.1/ # run bin/kafka-broker-api-versions.sh --bootstrap-server kafka_tutorial:9092결과 적으로 아래와 같이 connection 성공
172.19.0.2:9092 (id: 0 rack: null) -> ( Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 13 [usable: 13], ListOffsets(2): 0 to 7 [usable: 7], ... )
카프카 커맨드 라인 툴들은 카프카를 운영할 때 가장 많이 접하는 도구로, 커맨드 라인 툴을 이용하여, 브로커 운영에 필요한 다양한 명령을 내릴 수 있다.
--bootstrap-servervs--zookeeper
kafka 2.1버전 포함 이전 버전에서는 kafka command-line tool이 주키퍼와 직접 통신하여 명령을 실행하였으나, 2.2 버전 이후부터는 카프카를 통해 토픽과 관련된 명령을 실행할 수 있게 되었다.
--create로 topic을 생성하라는 명령어를 날림--bootstrap-server로 토픽을 생성할 카프카 클러스터를 구성하는 브로커들의 IP와 port를 적는다.--topic에서 토픽 이름을 적으며, 내부 데이터가 무엇이 있는지 유추가 가능할 정도로 자세히 적는 것을 추천한다.--partitions로 파티션의 개수를 지정할 수 있으며, default는 server.properties의 num.partitions이다.--replication-factor로 토픽의 파티션을 복제할 복제 개수를 적는다. 1의 경우는 복제를 하지 않는 것이며, 2이면 1개의 복제본을 사용하겠다는 의미이다. max값은 브로커의 개수며, 명시하지 않으면 default.replication.factor옵션의 값을 따른다.--config를 통해 추가적인 설정을 할 수 있다.bin/kafka-topics.sh \
--create \
--bootstrap-server kafka_tutorial:9092 \
--partitions 3 \
--replication-factor 1 \
--config retention.ms=86400000 \
--topic topic_kafka_test
Created topic topic_kafka_test.
토픽 생성
토픽을 생성하는 방법은 크게 2가지로 아래와 같다.
토픽 생성 시 토픽에 들어오는 데이터 양이나, 병렬로 처리되어야 하는 용량, 보관 기간 등 잘 파악하여 생성하는 것이 중요하다.
- 카프카 컨슈머 또는 프로듀서가 카프카 브로커에 생성되지 않은 토픽에 대해 요청할 때
- 커맨드 라인 툴로 명시적으로 토픽을 생성(이것을 추천)
bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 --list
파티션이 몇개인지, 복제된 파티션이 위치한 브로커의 번호, 구성하는 설정 등을 출력한다. 또한, 토픽이 가진 파티션의 리더가 현재 어느 브로커에 있는지 확인 가능하다.
bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 --describe --topic topi
c_kafka_test
Topic: topic_kafka_test TopicId: TI_ehzIXQzS0bg6CL2d2-w PartitionCount: 3 ReplicationFactor: 1 Configs: retention.ms=864000
Topic: topic_kafka_test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: topic_kafka_test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: topic_kafka_test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
파티션의 개수 수정은 kafka-topic.sh, 토픽 리텐션 기간 변경은 kafka-configs.sh를 사용하여야 한다.
# kafka partition count change
bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 \
--topic topic_kafka_test \
--alter \
--partitions 5
# kafka topic configure change
# if configure is already exists then change configure, else add configure
bin/kafka-configs.sh --bootstrap-server kafka_tutorial:9092 \
--entity-type topics \
--entity-name topic_kafka_test \
--alter --add-config retention.ms=86400000
토픽에 데이터를 넣을 수 있는 명령어로, 토픽에 넣는 데이터는 record라고 부르며 key,value 로 이루어져 있다.
키 값 없이 전송
이 때, 메세지 키는 null로 기본 설정되어 브로커로 전송된다.
bin/kafka-console-producer.sh \
--bootstrap-server kafka_tutorial:9092 \
--topic topic_kafka_test
>hello kafka
>test
>1
>2
>3
>4
>5
>;;
메시지 키를 가지는 레코드 전송
만약 key.separator에 해당하는 값을 전송하지 않으면 org.apache.kafka.common.KafkaException: No key separator found on line number 1: 'k-v'에러 발생
# default of key.separator = \t
bin/kafka-console-producer.sh \
--bootstrap-server kafka_tutorial:9092 \
--topic topic_kafka_test \
--property "parse.key=true" \
--property "key.seperator=:"
토픽으로 전송한 데이터를 가져올 수 있는 명령어로 --from-beginning옵션을 통해 토픽에 저장된 가장 처음 데이터부터 출력할 수 있다.
--group을 이용하여 컨슈머 그룹을 생성할 수 있다. 컨슈머 그룹은 1개 이상의 컨슈머로 이루어졌으며, 해당 컨슈머 그룹을 통해 가져간 토픽의 메시지는 가져갔다고 commit을 한다. commit은 내가 여기까지 가져갔어라고 브로커에 오프셋 번호를 저장하는 것으로, __consumer_offsets이름의 내부 토픽에 저장된다.
bin/kafka-console-consumer.sh \
--bootstrap-server kafka_tutorial:9092 \
--topic topic_kafka_test \
--from-beginning
# if want to see message key, value
bin/kafka-console-consumer.sh \
--bootstrap-server kafka_tutorial:9092 \
--topic topic_kafka_test \
--property print.key=true \
--property key.separator="-" \
--from-beginning
# consumer group
bin/kafka-console-consumer.sh \
--bootstrap-server kafka_tutorial:9092 \
--topic topic_kafka_test \
--property print.key=true \
--group consumer-group-test \
--from-beginning
output of below command-line
null hello kafka
null test
null 1
null 2
null 3
null 4
null 5
null ;;
k2 v2
k1 v1
k3 v3
partition 수가 2개 이상일 경우, 토픽에 넣은 데이터의 순서를 보장하지 못한다. 만약 데이터의 순서를 보장하고 싶다면 파티션 1개로 구성된 토픽을 만드는 것이다.
Consumer group list
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka_tutorial:9092 \
--list
특정 컨슈머 그룹 세부 정보
# describe consumer group which name is consumer-group-test
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka_tutorial:9092 \
--group consumer-group-test \
--describe
output
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
consumer-group-test topic_kafka_test 1 1 1 0 - - -
consumer-group-test topic_kafka_test 4 8 8 0 - - -
consumer-group-test topic_kafka_test 0 0 0 0 - - -
consumer-group-test topic_kafka_test 3 1 1 0 - - -
consumer-group-test topic_kafka_test 2 1 1 0 - -
kafka-verifiable로 시작하는 2개의 스크립트를 사용하면 string 타입 메시지 값을 코드 없이 주고 받을 수 있다. 간단한 네트워크 통신 테스트를 할 때 유용하다.
kafka-verifiable-consumer
bin/kafka-verifiable-producer.sh \
--bootstrap-server kafka_tutorial:9092 \
--max-messages 10 \
--topic topic_kafka_test
kafka-verifiable-producer
bin/kafka-verifiable-consumer.sh \
--bootstrap-server kafka_tutorial:9092 \
--group-id consumer-group-test \
--topic topic_kafka_test
이미 적재된 토픽의 데이터를 지우는 방법으로, 이미 적재된 토픽의 데이터 중 가장 오래된 데이터 부터 특정 시점의 오프셋까지 삭제가 가능하다.
파티션에 저장된 특정 데이터만 삭제할 수 없다는 점에서 유의가 필요하다.
~/test-delete-record.json
{
"partitions" : [
{
"topic" : "topic_kafka_test",
"partition": 4,
"offset": 5
}
],
"version": 1
}
bash
bin/kafka-delete-records.sh \
--bootstrap-server kafka_tutorial:9092 \
--offset-json-file ~/test-delete-record.json
성공 시
Executing records delete operation
Records delete operation completed:
partition: topic_kafka_test-4 low_watermark: 5
현재 record보다 더 많은 offset 요구시
Executing records delete operation
Records delete operation completed:
partition: topic_kafka_test-0 error: org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server.