Kafka Consumer Rebalancing 관리

개발 공부 일지·2022년 1월 7일
0

Kafka Consumer Group Coodinator는 Kafka Consumer 연결을 관리하면서, Consumer가 죽었다고 생각하는 경우가 2가지 있다.
1. session-timeout-ms 이내에 heartbeat를 한번도 받지 못한 경우
2. max-poll-interval-ms 이내에 poll 명령을 수행하지 않은 경우
이 2가지 경우에 Consumer가 죽었다고 생각해 Rebalancing을 수행한다.

이 Rebalancing 과정은 해당 Topic에 연결되어 있던 Consumer들을 다 끊고, 다시 Consumer로부터 연결 요청을 받아 Partition을 할당하는 과정이라서 overhead가 크다.

그런데, Consumer를 잠깐 reboot 하는 과정에서는 Rebalancing 과정을 회피하고 싶을 수도 있다. (Consumer Application source version 관리나 오류로 인해 reboot하거나 등등)
이럴 때, 일정 시간 동안은 Rebalancing이 일어나지 않도록 관리하는 방법이 있다. (Static Membership)

설정 자체는 간단하다.
1. Kafka Consumer 설정 중 group-instance-id (ConsumerConfig.GROUP_INSTANCE_ID_CONFIG) 를 unique한 값으로 설정
2. 예상되는 Consumer reboot 시간보다 session.timeout.ms와 max-poll-interval-ms를 더 크게 주면 된다.

  • 이에 맞게 heartbeat.interval.ms도 설정해주는 것이 좋다. (보통 session.timeout.ms의 1/3 정도로)

group-instance-id를 설정해주면, Consumer가 새로 구동될 때마다 Group Coodinator로부터 group-instance-id-{UUID} 형태로 member id를 할당 받아 Kafka Broker와 통신하게 된다. 연결이 끊겨도 설정한 session.timeout.ms & max-poll-interval-ms 안에 다시 연결만 하면, Rebalancing이 일어나지 않는다.

해당하는 로그는 Consumer Application Log와 해당 Topic Leader Kafka Broker의 server.log (kafka-logs가 아닌, kafka_home 경로의 logs에 남는 server.log)에서 확인할 수 있다. log상 ip주소는 {ipAddress}로 대치 했다.

Log 예시)
group-instance-id = CONSUMER_01
topic = REACTING

  • Consumer Application Log
    해당 하는 group coordinator 찾아서, memberId 새로 할당 받아 연결된 log
    2022-01-07 01:57:35.911 INFO 29173 --- [REACTING-FR-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer instanceId=CONSUMER_01, clientId=REACTING, groupId=REACTING] Discovered group coordinator {ipAddress}:9092 (id: 2147483646 rack: null)
    2022-01-07 01:57:36.024 INFO 29173 --- [REACTING-FR-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer instanceId=CONSUMER_01, clientId=REACTING, groupId=REACTING] Successfully joined group with generation Generation{generationId=3, memberId='CONSUMER_01-ee2182bb-a9c8-4d25-a6ab-95df9f691c8f', protocol='range'}
  • Leader Kafka Broker의 server.log
    [2022-01-07 01:57:36,005] INFO [GroupCoordinator 1]: Static member with groupInstanceId=CONSUMER_01 and unknown member id joins group REACTING in Stable state. Replacing previously mapped member CONSUMER_01-b6e50bef-aed4-4e1d-97d7-55b4b3c90d79 with this groupInstanceId. (kafka.coordinator.group.GroupCoordinator)
    [2022-01-07 01:57:36,006] INFO [GroupCoordinator 1]: Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance. (kafka.coordinator.group.GroupCoordinator)

Kafka Broker의 log를 보면, 새로 할당된 unknown member id로 join group 요청이 와서 이전 연결을 대체했다는 것을 알 수 있고, Rebalancing이 일어나지 않았다는 것을 알 수 있다.
-> unknown member id는 Consumer Application Log에 있는 "CONSUMER_01-ee2182bb-a9c8-4d25-a6ab-95df9f691c8f"

Stable stage라는 단어가 나오는 데, 이는 Kafka Consumer Group Coodinator가 state로 Rebalancing 과정을 관리하는데, 이 중 Stable stage로 바로 신규 연결을 대체해서 Rebalancing이 일어나지 않음을 알 수 있다.

profile
알고리즘 / 기술 스택 / CS

0개의 댓글