[Kafka] Kafka Consumer

CHAN LIM·2024년 1월 29일
0

Kafka

목록 보기
7/13


Kafka Consumer

Topic에 메시지를 읽어 들인다.

여러 개의 Consumer들로 구성된 경우
어떤 브로커의 파티션에서 메시지를 읽어들일지전략적 (Key)으로 결정

모든 Consumer들은 단 하나의 Consumer Group에 소속되어야 하며,
Consumer Group은 1개 이상의 Consumer를 가질 수 있다.

파티션의 레코드들은 단 하나의 Consumer에만 할당된다.

Kafka Topic에 저장된 데이터를 가져오는 애플리케이션, 서버

저장된 데이터를 가져오는 행위를 Read 읽는다고 부르기도 하고
Subscribe 구독한다고도 한다.


Polling 구조

Consumer는 Broker로부터 메시지를 요청하고,
Broker는 해당 Consumer에게 메시지를 전송한다.

  • 메시지 큐와 다르다.

    • 메시지 큐들은 메시지를 푸시 하는 방식
  • Consumer가 원하는 만큼 Kafka Broker에게 요청


Multi-app Consumption

하나의 Topic에 대해
서로 다른 애플리케이션이 각자의 데이터를 가져와서 읽을 수 있다.

  • 메시지가 삭제되지 않기 때문에 가능하다.

  • Consumer Offset이라는 Topic에,
    각 애플리케이션들이 어디까지 읽었는지 저장

  • 갑자기 Consumer가 동작이 멈추더라도 Offset을 보고 재개할 수 있다.

Consumer Group

하나 이상의 Consumer들이 Groupd을 이룰 수 있다.

  • 하나의 Topic에 있는 데이터를 여러 애플리케이션 Consumer Group에서 가져갈 수 있다.

Consumer Group Advanced


비교


Consumer 수 < Topic partition 수

Consumer 수 = Topic partition 수

Consumer 수 > Topic partition 수


Coordinator and leader discovery

Consumer Group 형태의 애플리케이션과 Kafka의 Handshake 관리를 위해,
Kafka 측의 CoordinatorConsumer Group의 Leader선출되어야 한다.

  • 프로세스를 시작하는 첫 번째 Consumer는 자동으로 Consumer Group에서 Leader로 선출된다.

Consumer가 Consumer Group에 가입하기 위해,

1. Coordinator 찾기

Consumer Group을 생성 혹은 가입하기 위해,
Kafka 측의 Coordinator를 찾아야한다.


2. Join Group

Coordinator를 찾으면,
Join Group 요청을 보낸다.

Coordinator는 Group의 Leader와 메타데이터를 내보낸다.


3. Sync Group

Rebalancing 작업의 트리거


4. Rebalancing

Consumer가 추가/제거되거나 "Sync Group" 요청이 전송되면 Consumer Group의 모든 Consumer는 업데이트된 파티션 할당을 받게 된다.

  • Rebalancing은 Consumer의 소유권을 재조정하는 일

  • Rebalancing이 발생한 Consumer Group 내의 모든 Consumer들은
    읽기 작업이 모두 중단된다.

    • 즉, Consumer 측의 일시적인 서비스 중단 발생 가능
    • 실제로 파티션 개수를 조정하면서 자주 발생
      • Topic Partition 조정 -> Consumer Rebalancing
  • Rebalancing은 Consumer의 멤버 변화는 물론 파티션 수가 증가할 때도 발생한다.

  • Consumer, Partition의 추가가 필요하면,
    충분한 고려 후 동작해야 한다.


5. Heartbeat

Consumer Group내의 각 Consumer는 주기적으로 Heartbeat 신호를 Coordinator에게 보낸다.

  • 시간 초과
    • Lost 또는 Rebalancing

6. Leave Group


Consumer Lag

Topic의 최신 Offset과 Consumer Offset 간의 차이

  • Consumer Lag을 통해 Consumer의 상태를 파악할 수 있다.

Options

  • bootstrap.servers
    • Producer와 동일하게 Broker의 정보 제공
  • fetch.min.bytes
    • 한 번에 가져올 수 있는 최소 데이터 크기
    • 만약 지정한 크기 보다 작다면,
      요청에 응답하지 않고 데이터가 누적될 때 까지 기다린다.
  • group.id
    • Consumer가 속한 Consumer Group을 식별하는 식별자
    • 동일한 그룹 내의 Consumer 정보는 모두 공유
  • heartbeat.interval.ms
    • 하트비트가 있다는 것은 Consumer의 상태는 Active
    • session.timeout.ms와 밀접한 관계가 있다.
      • session.timeout.ms보다 낮은 값으로 설정해야한다.
      • 보통 session.timeout.ms의 1/3 값 설정
  • session.timeout.ms
    • 이 시간을 이용하여 Consumer가 종료된 것인지 판단
      • 이 시간 전까지 하트비트를 보내지 않았다면,
        해당 Consumer는 종료된 것으로 간주하고 Consumer Group에서 제외. Rebalancing 시작
  • max.partition.fetch.bytes
    • 파티션당 가져올 수 있는 최대 크기
  • enable.auto.commit
    • 백그라운드로 주기적으로 Offset 커밋
  • auto.offset.reset
    • Kafka에 초기 Offset이 없을 경우, 또는
      현재 Offset이 더 이상 존재하지 않는 경우에 아래 옵션으로 reset :
      • earliest
        • 가장 초기 Offset 값으로 설정
      • latest
        • 가장 마지막의 Offset 값으로 설정
      • none
        • 이전 Offset 값을 찾지 못하면 에러
  • fetch.max.bytes
    • 한 번의 fetch 요청으로 가져올 수 있는 최대 크기
  • group.instance.id
    • Consumer의 고유 식별자
    • 설정 시, static 멤버로 간주되어 불필요한 Rebalancing X
  • max.poll.records
    • 한 번의 poll() 요청으로 가져오는 최대 메시지 수
  • fetch.max.watt.ms
    • fetch.mln.bytes에 의해 설정된 데이터보다 적은 경우
      요청에 대한 응답을 기다리는 최대 시간

출처

Kafka Producer Overview

profile
클라우드, 데이터, DevOps 엔지니어 지향 || 글보단 사진 지향

0개의 댓글