카프카 클러스터 구성 가이드

조윤호·2023년 9월 28일
0

kafka

목록 보기
3/3
post-thumbnail

개요

프로젝트에서 다음과 같은 구조로 클러스터를 운영하고 있다.

카프카를 사용하는 목적으로는 다음과 같은 이점이 있다.

  • 토픽을 통한 확장성 : 다양한 토픽 정책 (순서 보장 / 한 번만 전송 / ISR) 적용 가능성
  • 수평 확장 : 기존 큐, 레디스 에 비해 수평 확장에 용이
  • 브로커와 컨슈머의 처리량 조절 : offset 방식으로 브로커와 컨슈머의 처리량을 균등하게 분담하며, 컨슈머에서 장애가 발생하더라도 메시지 손실 방지됨

설치 가이드

1. 인스턴스 생성

자바가 설치되어 있지 않다면 자바 설치

sudo apt-get update
sudo apt-get install openjdk-11-jdk

host 명을 간편하게 설정하기 위해 ip주소와 이름을 매핑하는 설정을 추가한다.

cd /etc
sudo vi ./hosts

/etc/hosts 에 들어가서 다음 내용을 추가해준다.

  • zookeeper server 1
    0.0.0.0 broker1     ## 내 ip
    10.0.90.35 broker2
    10.0.71.136 broker3
  • zookeeper server 2
    10.0.75.183 broker1
    0.0.0.0 broker2        ## 내 ip
    10.0.71.136 broker3
  • zookeeper server 3
    10.0.75.183 broker1
    10.0.90.35 broker2
    0.0.0.0 broker3        ## 내 ip

2. 주키퍼 앙상블 설치

wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.2/apache-zookeeper-3.8.2-bin.tar.gz
tar -xf ./apache-zookeeper-3.8.2-bin.tar.gz
mv ./apache-zookeeper-3.8.2-bin ~/zookeeper/
mkdir -p ~/data/zookeeper/

# 숫자 : 각 주키퍼 서버를 구분하기 위한 숫자를 입력한다.
echo 1 > ~/data/zookeeper/myid

# 샘플 설정파일을 복사해 설정파일로 사용한다.
cp ./zookeeper/conf/zoo_sample.cfg  ./zookeeper/conf/zoo.cfg
vi ./zookeeper/conf/zoo.cfg

./zookeeper/conf/zoo.cfg

# The number of milliseconds of each tick
tickTime=2000

# The number of ticks that the initial
# synchronization phase can take
initLimit=10

# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5

# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/ubuntu/data/zookeeper

# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=60

# server.{id}={ip}:{peerPort}:{leaderPort}
server.1=broker1:2888:3888
server.2=broker2:2888:3888
server.3=broker3:2888:3888
./zookeeper/bin/zkServer.sh start
vi ./zookeeper/logs/zookeeper-ubuntu-server-ip-{xxx}.out
./zookeeper/bin/zkCli.sh -server 10.0.75.183:2181,10.0.90.35:2181,10.0.71.136:2181
  • 2181 : zkCli 또는 카프카 브로커가 접속하기 위한 포트
  • 2888 : 앙상블 내의 zk 간 통신하기 위한 포트
  • 3888 : 리더 선출 시 사용되는 포트

3. 카프카 클러스터 설치

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz
tar -xf ./kafka_2.13-3.2.0.tgz
mv ./kafka_2.13-3.2.0 ./kafka
rm ./kafka_2.13-3.2.0.tgz
mkdir -p ./data/broker

vi ./kafka/config/server.properties
  • ./kafka/config/server.properties
    ############################# Server Basics #############################
    
    # The id of the broker. This must be set to a unique integer for each broker.
    # 브로커 고유값 지정
    broker.id=1
    
    ############################# Socket Server Settings #############################
    
    # The address the socket server listens on. If not configured, the host name will be equal to the value of
    # java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
    #   FORMAT:
    #     listeners = listener_name://host_name:port
    #   EXAMPLE:
    #     listeners = PLAINTEXT://your.host.name:9092
    #listeners=PLAINTEXT://:9092
    
    # Listener name, hostname and port the broker will advertise to clients.
    # If not set, it uses the value for "listeners".
    #advertised.listeners=PLAINTEXT://your.host.name:9092
    
    # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    
    # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    num.network.threads=3
    
    # The number of threads that the server uses for processing requests, which may include disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    ############################# Log Basics #############################
    
    # A comma separated list of directories under which to store log files
    # (로그 디렉토리 수정) log.dirs=/tmp/kafka-logs
    log.dirs=/home/ubuntu/data/broker 
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    ############################# Internal Topic Settings  #############################
    # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
    offsets.topic.replication.factor=3
    		transaction.state.log.replication.factor=1
    		transaction.state.log.min.isr=2
    		auto.create.topics.enalbe=false
    		min.insync.replicas=2
    		delete.topic.enable=true
    
    ############################# Log Flush Policy #############################
    
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    
    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000
    
    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000
    
    ############################# Log Retention Policy #############################
    
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    # 주키퍼 연결 설정
    zookeeper.connect=10.0.75.183:2181,10.0.90.35:2181,10.0.71.136:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=18000
    
    ############################# Group Coordinator Settings #############################
    
    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    # The default value for this is 3 seconds.
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    group.initial.rebalance.delay.ms=0
nohup ./kafka/bin/kafka-server-start.sh ./kafka/config/server.properties > broker.log &

클라이언트가 카프카 클러스터 접속 시 동작

  1. 클라이언트는 클러스터 내 어떤 브로커에도 접근할 수 있다.
  2. 브로커는 [server.properties](http://server.properties/) 내의 listeners=PLAINTEXT://{...}:9092 설정을 만족하는 요청에 대해 연결을 응답하고,
    이후 연결 정보를 제공한다.
  3. 클라이언트는 (1)에서 접속한 브로커로부터 카프카 연결 정보와, 이후에 접속할 주소를 제공받는다.
    이후에 접속할 주소는 [server.properties](http://server.properties/) 내의 advertised.listeners=PLAINTEXT://your.host.name:9092 에서 설정할 수 있다.
  4. 이후 제공된 주소로 재접속하여 작업을 진행한다.

4. 접속 테스트

~/kafka/bin/kafka-topics.sh --create --bootstrap-server 10.0.75.183:9092,10.0.90.35:9092,10.0.71.136:9092 \
  --replication-factor 3 --partitions 3 --topic test

~/kafka/bin/kafka-topics.sh --bootstrap-server 10.0.75.183:9092,10.0.90.35:9092,10.0.71.136:9092 \
--list

~/kafka/bin/kafka-console-producer.sh --broker-list 10.0.75.183:9092,10.0.90.35:9092,10.0.71.136:9092 \
  --topic test

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.0.75.183:9092,10.0.90.35:9092,10.0.71.136:9092 \
  --topic test --from-beginning

잘 접속되고 실행된다.

profile
한걸음씩 성실히

0개의 댓글