아파치 카프카(Apache Kafka)의 컨슈머(Consumer) [9]

busybean3·2021년 9월 7일
0

카프카

목록 보기
9/13

이번 포스팅을 통해서 아파치 카프카의 컨슈머(Consumer)에 대해 알아보겠습니다.

1. 컨슈머(Consumer)란?

여기서 컨슈머는 토픽의 메시지를 가져와서 소비(consume)하는 역할을 하는 애플리케이션, 서버 등을 지칭하여 컨슈머라 일컫습니다.

1-1. 컨슈머의 주요 기능은?

이 컨슈머의 주요 기능은 특정 파티션을 관리하고 있는 파티션 리더에게 메시지를 가져오기 요청하는 것 입니다. 각 요청은 로그의 오프셋을 명시하고 그 위치로부터 로그 메시지를 수신합니다. 그래서 컨슈머는 가져올 메시지의 위치를 조정할 수 있고, 필요하다면 이미 가져온 데이터도 다시 가져올 수 있습니다. 이렇게 이미 가져온 메시지를 다시 가져올 수 있는 기능은 래빗엠큐(RabbitMQ)와 같은 일반적인 메시지큐 솔류션에서는 제공하지 않는 기능입니다. 하지만 최근에는 메시지큐 솔루션 사용자들에게 이러한 기능이 오히려 필수 기능으로 자리 잡고 있습니다.

1-2. 컨슈머의 종류

카프카에서 컨슈머라고 불리우는 컨슈머는 두 가지 종류가 있다고 볼 수 있습니다.

  1. 올드 컨슈머 (Old consumer) : 컨슈머의 오프셋을 주키퍼의 지노드에 저장하는 방식
  2. 뉴 컨슈머 (New consumer) : 컨슈머의 오프셋 저장을 주키퍼가 아닌 카프카의 토픽에 저장하는 방식

최신의 컨슈머 클라이언들 대부분은 New consumer로 구현되어 있습니다.

2. 컨슈머의 주요 명령어

2-1. 컨슈머 콘솔 명령어

보낸 메시지가 잘 전송되었는지 메시지를 가져와서 확인하는 명령어를 확인해보겠습니다.

위치는 동일하며, 명령어는 kafka-console-consumer.sh 입니다.

option

  • --bootstrap-server
  • --topic : 메시지를 확인하고자 하는 토픽 이름 작성
  • --from-beginning

command

## 컨슈머 콘솔 명령어
/usr/local/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092 \
--topic peter-topic \
--from-beginning

그리고 컨슈머를 생성하게 되면 컨슈머 그룹도 생성되는데 이를 확인하는 명령어는 아래와 같습니다.

## 컨슈머 그룹 확인 콘솔 명령어
/usr/local/kafka/bin/kafka-consumer-group.sh \
--bootstrap-server peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092 \
--list

만약 컨슈머의 그룹 이름을 지정하고 싶다면 아래와 같은 명령어를 사용합니다.

추가되는 옵션은 --group 으로 뒤에 그룹 이름을 설정하면됩니다.

## 컨슈머 그룹 이름이 추가된 콘솔 명령어
/usr/local/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092 \
--topic peter-topic \
--group peter-consumer-group \
--from-beginning

2-2. 컨슈머로 메시지 소비-Python

이번에는 콘솔을 통해서 메시지를 소비하는 것이 아닌 python code를 통해서 소비하는 기본적인 방법을 알아보겠습니다. 여러 샘플들이 존재하는데 reference는 https://github.com/dpkp/kafka-python입니다. (https://github.com/confluentinc/confluent-kafka-python 도 있습니다.)

code

from kafka import KafkaConsumer
from json import loads

# topic, broker list
consumer = KafkaConsumer('peter-topic', bootstrap_servers=['peter-kafka001:9092','peter-kafka002:9092','peter-kafka003:9092'], enable_auto_commit=True, group_id='peter-consumer-group', value_deserializer=lambda x: loads(x.decode('utf-8')) )


print('[begin] get consumer list')
for message in consumer:
    print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % ( message.topic, message.partition, message.offset, message.key, message.value ))


print('[end] get consumer list')

3. 컨슈머의 주요 옵션

  • bootstrap.server : 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보를 나타냅니다.
  • fetch.min.bytes : 한번에 가져올 수 있는 최소 데이터 사이즈입니다. 만약 지정한 사이즈보다 작은 경우, 데이터가 누적될 때까지 기다립니다.
  • group.id : 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자입니다.
  • enable.auto.commit : 백그라운드로 주기적으로 오프셋을 커밋합니다.
  • auto.offset.reset : 카프카에서 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않은 경우에 다음 옵션으로 리셋합니다.
    • earliest : 가장 초기의 오프셋값으로 설정
    • latest : 가장 마지막의 오프셋값으로 설정
    • none : 이전 오프셋값을 찾이 못하면 에러
  • fetch.max.bytes : 한번에 가져올 수 있는 최대 데이터 사이즈
  • reques.timeout.ms : 요청에 대해 응답을 기다리는 최대 시간
  • session.timeout.ms : 컨슈머와ㅓ 브로커 사이의 세션 타임 아웃 시간. 브로커가 컨슈머가 살아있는것으로 판단하는 시간으로, 만약 컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않고 시간이 지나면 해당 컨슈머는 종료되거나 장애가 발생한 것으로 판단하고 컨슈머 그룹은 리밸런스를 시도합니다.
  • heartbeat.interval.ms : 그룹 코디네이터에게 얼마나 자주 KafkaConsumer poll() 메소드로 하트비트를 보낼 것인지 조정
  • max.poll.records : 단일 호출 poll()에 대한 최대 레코드 수를 조정
  • max.poll.interval.ms : 컨슈머가 살아있는지를 체크하기 위해 하트비트를 주기적으로 보내는데, 컨슈머가 계속해서 하트비트만 보내고 실제로 메시지를 가져가지 않는 경우가 있을 수도 있습니다. 이러한 경우 컨슈머가 무한정 해당 파티션을 점유할 수 없도록 주기적으로 poll을 호출하지 않으면 장애라고 판단하고 컨슈머 그룹에서 제외한 후 다른 컨슈머가 해당 파티션에서 메시지를 가져갈 수 있게 합니다.
  • auto.commit.interval.ms 주기적으로 오프셋을 커밋하는 시간
  • fetch.max.wait.ms : fetch.min.bytes에 설정된 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간

REFERENCE

해당 글의 모든 레퍼런스는 "카프카, 데이터 플랫폼의 최강자" (고승범, 공용준 지음) 을 알립니다.

https://coupa.ng/b5xV58

"이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다."

profile
엉덩이 무거운 개발자가 되기 위해서 몸무게를 찌웠다...

0개의 댓글