Kafka tutorial

Log·2022년 10월 19일
0

Kafka

목록 보기
4/9
post-thumbnail

Kafka tutorial을 위해 책에서는 EC2를 이용하였으나, Docker를 이용해서 진행하고자 한다.

Kafka with Docker

Docker로 amazon linux 접속

# amazon linux install
docker pull amazonlinux

# port 9092, 2181 열어서 실행
docker run -it -p 9092:9092 -p 2181:2181 --name kafka_tutorial amazonlinux /bin/bash

Java 설치

# openjdk 1.8
yum install -y java-1.8.0-openjdk-devel.x86_64
# install 확인
java -version

아래와 같이 나오면 성공

openjdk version "1.8.0_342"
OpenJDK Runtime Environment (build 1.8.0_342-b07)
OpenJDK 64-Bit Server VM (build 25.342-b07, mixed mode)

Kafka 설치 및 실행

최근 버전을 좋아하므로... 최근 버전으로 진행해보고자 한다.(물론 production에서는 stable 버전을 사용해야함)

# wget, tar install
yum install -y wget tar
# kafka 3.3.1, scalar 2.12 version
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
# unzip
tar xvf kafka_2.12-3.3.1.tgz

브로커 힙 메모리 설정

kafka 브로커는 레코드의 내용은 페이지 캐시로 시스템 메모리를 사용하고, 나머지 객체들은 힙 메모리에 저장하여 사용한다는 특징이 있다.
그래서 kafka 브로커의 경우 힙 메모리를 6GB 이상 설정하지 않는 것을 권장한다.
책에서는 설정 값을 400m로 설정 하던데, 도커이니 한번 2GB를 줘서 실행해보고자 한다.

# Xmx : Java 힙의 최대 크기를 지정하는 것
# Xms : Java 힙의 최초 크기를 지정하는 것
export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"

브로커 실행옵션

config/server.properties에서 카프카 브로커가 클러스터 운영에 필요한 옵션들을 지정할 수 있다. 이번 튜토리얼에서 advertised.listeners=PLAINTEXT://172.19.0.2:9092로 수정(docker container ip)
config/server.properties에 있는 대표적인 옵션은 아래와 같다.

  • broker.id
    실행하는 카프카 브로커의 번호로, 클러스터를 구축할 때 브로커들은 구분하기 위해 identity한 번호로 설정해야 한다. 만약, 다른 카프카 브로커와 동일한 id를 가질 경우 비정상적인 동작이 발생할 수 있다.
  • listeners
    카프카 브로커가 통신을 위해 열어둘 인터페이스 IP, port, protocol을 설정할 수 있으며, 설정을 따로 하지 않으면 모든 IP와 port에서 접속할 수 있다.
  • advertised.listners
    카프카 클라이언트 또는 카프카 커맨드 라인 툴에서 접속할 때 사용할 IP와 port정보
  • listener.security.protocol.map
    SASL_SSL, SASL_PLAIN 보안 설정 시 프로토콜 매핑을 위한 설정
  • num.network.threads
    네트워크를 통한 처리를 할 때 사용할 네트워크 스레드 개수
  • num.io.threads
    카프카 브로커 내부에서 사용할 스레드 개수
  • log.dirs
    카프카의 경우, 모든 메시지를 로그 세그먼트 파일에 모아서 디스크에 저장한다. 이를 위해 통신을 통해 가져온 데이터를 파일로 저장할 디렉토리의 위치로 경로를 쉼표로 구분해서 여러개의 경로에 저장할 수 있다.
  • num.partitions
    파티션의 개수로, 파티션의 수가 많아지면 병렬처리 데이터의 양이 증가 한다.
  • log.retention.hours
    브로커가 메시지가 삭제되기까지 걸리는 시간으로 log.retention.minutes 또는 log.retention.ms로도 사용할 수 있다. log.retention.ms값을 설정하여 운영하는 것을 추천하며, log.retention.ms=-1이면 영원히 삭제되지 않는다.
  • log.segment.bytes
    브로커가 메시지의 최대 크기를 지정하는 것으로, 데이터양이 많아 이 크기를 채우게 되면 새로운 파일이 생성된다.
  • log.retention.check.interval.ms
    브로커가 메시지를 삭제하기 위해 체크하는 간격
  • zookeeper.connect
    브로커와 연동할 주키퍼의 IP와 port
  • zookeeper.connection.timeout.ms
    주키퍼의 세션 타임아웃 시간으로, 주키퍼에 연결할 때 시간 제한을 두고 그 이상이 되면 더 이상 시도를 하지 않는다

주키퍼 실행

카프카를 실행하기 위해서는 주키퍼가 반드시 필요하다. 주키퍼는 클러스터 설정 리더 정보, 컨트롤러 정보를 담고 있으며, 상용 환경에서는 안전하게 3대 이상의 서버로 구성하여 사용하여야 한다.
여기서는 1대만 실행할 것이며, 이를 Quick-and-dirty single-node라고 부른다고 한다.

# start zookeeper (daemon to run background, if you want to run foreground remove that argument)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

# check zookeeper run normally
jps -vm

주키퍼란? https://velog.io/@2h-kim/Zookeeper

카프카 브로커 실행

# start kafka broker (daemon to run background, if you want to run foreground 
bin/kafka-server-start.sh -daemon config/server.properties

# check kafka run normally
jps -m

# show broker log
tail -f logs/server.log

local과의 통신 확인

윈도우에서는 wsl로 실행

# kafka download
curl https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz --output kafka.tgz

# unzip
tar -xvf kafka.tgz
cd kafka_2.12-3.3.1/

docker과 wsl 네트워크 통신이 안 되는 것 같아서 아래와 같이 진행(여기서 삽질 엄청 함...ㅎㅎㅎ)

# network 생성
docker network create --driver bridge test-network
# container network 연결
docker network connect test-network kafka_tutorial
# network 연결된 docker container 생성
docker run -itd --name kafka_connect_test_ubuntu --net=test-network ubuntu /bin/bash
# exec
docker exec -it kafka_connect_test_ubuntu /bin/bash
# kafka download
curl https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz --output kafka.tgz
# unzip
tar -xvf kafka.tgz
cd kafka_2.12-3.3.1/
# run
bin/kafka-broker-api-versions.sh --bootstrap-server kafka_tutorial:9092 

결과 적으로 아래와 같이 connection 성공

172.19.0.2:9092 (id: 0 rack: null) -> (
        Produce(0): 0 to 9 [usable: 9],
        Fetch(1): 0 to 13 [usable: 13],
        ListOffsets(2): 0 to 7 [usable: 7],
        ...
)



Kafka command-line tool

카프카 커맨드 라인 툴들은 카프카를 운영할 때 가장 많이 접하는 도구로, 커맨드 라인 툴을 이용하여, 브로커 운영에 필요한 다양한 명령을 내릴 수 있다.

--bootstrap-server vs --zookeeper
kafka 2.1버전 포함 이전 버전에서는 kafka command-line tool이 주키퍼와 직접 통신하여 명령을 실행하였으나, 2.2 버전 이후부터는 카프카를 통해 토픽과 관련된 명령을 실행할 수 있게 되었다.


kafka-topics.sh

토픽 생성
  • --create로 topic을 생성하라는 명령어를 날림
  • --bootstrap-server로 토픽을 생성할 카프카 클러스터를 구성하는 브로커들의 IP와 port를 적는다.
  • --topic에서 토픽 이름을 적으며, 내부 데이터가 무엇이 있는지 유추가 가능할 정도로 자세히 적는 것을 추천한다.
  • (Option) --partitions로 파티션의 개수를 지정할 수 있으며, default는 server.properties의 num.partitions이다.
  • (Option) --replication-factor로 토픽의 파티션을 복제할 복제 개수를 적는다. 1의 경우는 복제를 하지 않는 것이며, 2이면 1개의 복제본을 사용하겠다는 의미이다. max값은 브로커의 개수며, 명시하지 않으면 default.replication.factor옵션의 값을 따른다.
  • (Option) --config를 통해 추가적인 설정을 할 수 있다.
bin/kafka-topics.sh \
 --create \
 --bootstrap-server kafka_tutorial:9092 \
 --partitions 3 \
 --replication-factor 1 \
 --config retention.ms=86400000 \
 --topic topic_kafka_test
Created topic topic_kafka_test.

토픽 생성
토픽을 생성하는 방법은 크게 2가지로 아래와 같다.
토픽 생성 시 토픽에 들어오는 데이터 양이나, 병렬로 처리되어야 하는 용량, 보관 기간 등 잘 파악하여 생성하는 것이 중요하다.

  • 카프카 컨슈머 또는 프로듀서가 카프카 브로커에 생성되지 않은 토픽에 대해 요청할 때
  • 커맨드 라인 툴로 명시적으로 토픽을 생성(이것을 추천)
토픽 리스트 조회
bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 --list
토픽 상세 조회

파티션이 몇개인지, 복제된 파티션이 위치한 브로커의 번호, 구성하는 설정 등을 출력한다. 또한, 토픽이 가진 파티션의 리더가 현재 어느 브로커에 있는지 확인 가능하다.

bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 --describe --topic topi
c_kafka_test
Topic: topic_kafka_test TopicId: TI_ehzIXQzS0bg6CL2d2-w PartitionCount: 3       ReplicationFactor: 1    Configs: retention.ms=864000
        Topic: topic_kafka_test Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic_kafka_test Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic_kafka_test Partition: 2    Leader: 0       Replicas: 0     Isr: 0
토픽 옵션 수정

파티션의 개수 수정은 kafka-topic.sh, 토픽 리텐션 기간 변경은 kafka-configs.sh를 사용하여야 한다.

# kafka partition count change
bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 \
 --topic topic_kafka_test \
 --alter \
 --partitions 5
 
# kafka topic configure change
# if configure is already exists then change configure, else add configure 
bin/kafka-configs.sh --bootstrap-server kafka_tutorial:9092 \
 --entity-type topics \
 --entity-name topic_kafka_test \
 --alter --add-config retention.ms=86400000

kafka-console-producer.sh

토픽에 데이터를 넣을 수 있는 명령어로, 토픽에 넣는 데이터는 record라고 부르며 key,value 로 이루어져 있다.
키 값 없이 전송
이 때, 메세지 키는 null로 기본 설정되어 브로커로 전송된다.

bin/kafka-console-producer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --topic topic_kafka_test

>hello kafka
>test
>1
>2
>3
>4
>5
>;;

메시지 키를 가지는 레코드 전송
만약 key.separator에 해당하는 값을 전송하지 않으면 org.apache.kafka.common.KafkaException: No key separator found on line number 1: 'k-v'에러 발생

# default of key.separator = \t
bin/kafka-console-producer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --topic topic_kafka_test \
 --property "parse.key=true" \
 --property "key.seperator=:"

kafka-console-consumer.sh

토픽으로 전송한 데이터를 가져올 수 있는 명령어로 --from-beginning옵션을 통해 토픽에 저장된 가장 처음 데이터부터 출력할 수 있다.
--group을 이용하여 컨슈머 그룹을 생성할 수 있다. 컨슈머 그룹은 1개 이상의 컨슈머로 이루어졌으며, 해당 컨슈머 그룹을 통해 가져간 토픽의 메시지는 가져갔다고 commit을 한다. commit은 내가 여기까지 가져갔어라고 브로커에 오프셋 번호를 저장하는 것으로, __consumer_offsets이름의 내부 토픽에 저장된다.

bin/kafka-console-consumer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --topic topic_kafka_test \
 --from-beginning
 
# if want to see message key, value
bin/kafka-console-consumer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --topic topic_kafka_test \
 --property print.key=true \
 --property key.separator="-" \
 --from-beginning
 
# consumer group
bin/kafka-console-consumer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --topic topic_kafka_test \
 --property print.key=true \
 --group consumer-group-test \
 --from-beginning

output of below command-line

null    hello kafka
null    test
null    1
null    2
null    3
null    4
null    5
null    ;;
k2      v2
k1      v1
k3      v3

partition 수가 2개 이상일 경우, 토픽에 넣은 데이터의 순서를 보장하지 못한다. 만약 데이터의 순서를 보장하고 싶다면 파티션 1개로 구성된 토픽을 만드는 것이다.


kafka-consumer-groups.sh

Consumer group list

bin/kafka-consumer-groups.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --list

특정 컨슈머 그룹 세부 정보

# describe consumer group which name is consumer-group-test 
bin/kafka-consumer-groups.sh  \
 --bootstrap-server kafka_tutorial:9092 \
 --group consumer-group-test \
 --describe

output

  • current-offset : 토픽의 가장 최신 오프셋이 몇 번인지 나타냄
  • log-end-offset : 컨슈머 그룹의 컨슈머가 어디까지 커밋했는지
  • lag : 컨슈머 그룹이 토픽의 파티션에 있는 데이터를 가져가는 데에 얼마나 지연이 발생하였는지
GROUP               TOPIC            PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
consumer-group-test topic_kafka_test 1          1               1               0               -               -               -
consumer-group-test topic_kafka_test 4          8               8               0               -               -               -
consumer-group-test topic_kafka_test 0          0               0               0               -               -               -
consumer-group-test topic_kafka_test 3          1               1               0               -               -               -
consumer-group-test topic_kafka_test 2          1               1               0               -               -    

kafka-verifiable-producer, counsumer.sh

kafka-verifiable로 시작하는 2개의 스크립트를 사용하면 string 타입 메시지 값을 코드 없이 주고 받을 수 있다. 간단한 네트워크 통신 테스트를 할 때 유용하다.

kafka-verifiable-consumer

bin/kafka-verifiable-producer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --max-messages 10 \
 --topic topic_kafka_test

kafka-verifiable-producer

bin/kafka-verifiable-consumer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --group-id consumer-group-test \
 --topic topic_kafka_test

kafka-delete-records.sh

이미 적재된 토픽의 데이터를 지우는 방법으로, 이미 적재된 토픽의 데이터 중 가장 오래된 데이터 부터 특정 시점의 오프셋까지 삭제가 가능하다.
파티션에 저장된 특정 데이터만 삭제할 수 없다는 점에서 유의가 필요하다.
~/test-delete-record.json

{
  "partitions" : [
    {
      "topic" : "topic_kafka_test",
      "partition": 4,
      "offset": 5
    }
    ],
  "version": 1
}

bash

bin/kafka-delete-records.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --offset-json-file ~/test-delete-record.json

성공 시

Executing records delete operation
Records delete operation completed:
partition: topic_kafka_test-4   low_watermark: 5

현재 record보다 더 많은 offset 요구시

Executing records delete operation
Records delete operation completed:
partition: topic_kafka_test-0   error: org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server.



요약

  • zookeeper의 경우, kafka-cluster 설정 리더 정보, 컨트롤러 정보를 담고 있어 kafka를 실행하는 데 필수 애플리케이션이다.
  • producer를 이용해 데이터를 집어넣고 consumer를 통해 데이터를 가져오며, consumer-group을 이용하여 consumer가 어느 데이터 까지 가져갔는지 체크포인트를 남길 수 있다.
  • kafka command-line tool의 경우 운영 시 자주 사용하므로 손으로 익히는 것이 좋다.

추가 출처

profile
열심히 정리하는 습관 기르기..

0개의 댓글