Kafka cluster 구성

Volc·2023년 9월 15일
0

Kafka

목록 보기
1/1

Zookeeper 설치

Kafka 설치

  • 위 링크대로 zookeeper의 설치가 끝나고 zookeeper를 실행 시켰다면 kafka를 설치해보자.
  • 우선 kafka 폴더를 생성한다.
    cd /opt
    mkdir kafka
    cd kafa
  • 아래 링크를 들어가 원하는 버전을 설치하고 압축을 해제한다.
    https://kafka.apache.org/downloads
    wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
    
    tar -xzvf kafka_2.12-2.2.1.tgz

경로 설정

  • kafka의 경로 설정을 위해 bashrc 파일을 연다
    vim ~/.bashrc
  • 다음과 같이 경로를 추가한다.
    export KAFKA_HOME=/opt/kafka/kafka_2.12-2.2.1
    export PATH=$PATH:$KAFKA_HOME/bin
  • 수정된 사항을 적용 시킨다.
    source ~/.bashrc

kafka 환경 설정

  • cluster 구성을 위한 kafka 설정을 해준다.
  • config 폴더에 들어간다.
    cd $KAFKA_HOME
    cd config
  • server.properties 파일을 수정해준다.
    vim server.properties
  • master의 경우 broker.id를 0, worker의 경우 broker.id를 1로 설정한다.
  # The id of the broker. This must be set to a unique integer for each broker.
  broker.id=0

  # The number of threads that the server uses for receiving requests from the network and sending responses to the network
  num.network.threads=3

  # The number of threads that the server uses for processing requests, which may include disk I/O
  num.io.threads=8

  # The send buffer (SO_SNDBUF) used by the socket server
  socket.send.buffer.bytes=102400

  # The receive buffer (SO_RCVBUF) used by the socket server
  socket.receive.buffer.bytes=102400

  # The maximum size of a request that the socket server will accept (protection against OOM)
  socket.request.max.bytes=104857600

  # A comma separated list of directories under which to store log files
  log.dirs=/opt/kafka/kafka_2.12-2.2.1/kafka-logs

  # The default number of log partitions per topic. More partitions allow greater
  # parallelism for consumption, but this will also result in more files across
  # the brokers.
  num.partitions=1

  # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
  # This value is recommended to be increased for installations with data dirs located in RAID array.
  num.recovery.threads.per.data.dir=1

  ############################# Internal Topic Settings  #############################
  # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
  # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
  offsets.topic.replication.factor=1
  transaction.state.log.replication.factor=1
  transaction.state.log.min.isr=1

  # The minimum age of a log file to be eligible for deletion due to age
  log.retention.hours=168

  # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
  # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
  #log.retention.bytes=1073741824

  # The maximum size of a log segment file. When this size is reached a new log segment will be created.
  log.segment.bytes=1073741824

  # The interval at which log segments are checked to see if they can be deleted according
  # to the retention policies
  log.retention.check.interval.ms=300000

  ############################# Zookeeper #############################

  # Zookeeper connection string (see zookeeper docs for details).
  # This is a comma separated host:port pairs, each corresponding to a zk
  # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
  # You can also append an optional chroot string to the urls to specify the
  # root directory for all kafka znodes.
  zookeeper.connect=master:2181,worker1:2181,worker2:2181,worker3:2181

  # Timeout in ms for connecting to zookeeper
  zookeeper.connection.timeout.ms=6000


  ############################# Group Coordinator Settings #############################

  # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
  # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
  # The default value for this is 3 seconds.
  # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
  # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
  group.initial.rebalance.delay.ms=0
  delete.topic.enable=true

서버의 순서대로 broker.id의 번호를 다르게 설정해주어야 한다.

zookeeper와 연결을 시키기 위해 zookeeper.connect에 zookeeper의 서버를 작성해준다.

  • log.dirs를 변경했기 때문에 작성한 경로대로 directory를 만들어준다.
    cd $KAFKA_HOME
    mkdir kafka-logs

배포

  • master에 설치하고 설정한 kafka를 worker들에게 배포한다.

    # worker1로 전송
    scp -r $KAFKA_HOME hadoop@worker1:/opt/kafka
    
    # worker2로 전송
    scp -r $KAFKA_HOME hadoop@worker2:/opt/kafka
    
    # worker3로 전송
    scp -r $KAFKA_HOME hadoop@worker3:/opt/kafka

보낸 후 config 폴더에 broker-id를 잘 수정해준다.

Kafka 실행

  • kafka를 실행하기 위해 다음 명령어를 입력한다.
    kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  • 모든 서버에 위와 같이 실행을 한다.

테스트

  • cluster 구성이 되었는지 확인해보자.
  • 각 서버에 jps를 입력하여 kafka가 떠 있는지 확인한다.
  • master 서버에서 topic을 하나 생성해보자.
    kafka-topic.sh --create --zookeeper master:2181 --topic MyTest --partitions 1 --replication-factor 1
  • worker1 서버에서 topic이 생성 되었는지 확인해본다.
    kafka-topic.sh --list --zookeeper worker1
  • MyTest topic이 나왔으면 성공이다.
  • 이제 MyTest topic을 지워보자. 서버 어디서든 다음 명령어를 실행해보자.
    kafka-topic.sh --delete --zookeeper master:2181 --topic MyTest
  • 다른 서버에 들어가 삭제 되었는지 확인해본다.
    kafka-topic.sh --list --zookeeper worker1
  • 아무것도 나오지 않으면 성공이다.

참고 사이트

kafka 이해하기

kafka 클러스터 구성

profile
미래를 생각하는 개발자

0개의 댓글