카프카 컨슈머

테크·2023년 1월 8일
0

카프카(Kafka)

목록 보기
3/8

컨슈머 동작

파티션과 컨슈머

컨슈머는 프로듀서가 카프카로 발행한 메시지를 가져와서 소비하는 역할을 담당합니다. 프로듀서가 아무리 빠르게 메시지를 발행해도 컨슈머가 소비하지 않으면 결국 처리가 지연되므로 컨슈머는 카프카 전체 동작에서 매우 중요한 부분을 담당합니다.

컨슈머가 카프카로 메시지를 발행하면 브로커는 세그먼트로 로컬 디스크에 메시지를 저장하게 됩니다. 컨슈머는 이를 읽어들여 로직을 처리합니다.

컨슈머는 그룹으로 묶이고 반드시 컨슈머 그룹에 포함되게 됩니다. 컨슈머 그룹은 각 파티션의 리더에게 메시지를 요청하게 됩니다. 가장 이상적인 상황은 컨슈머 그룹의 컨슈머 갯수와 파티션의 갯수가 1:1로 매칭되는 상황입니다.

컨슈머 갯수가 파티션보다 많아지면 연결되지 않은 컨슈머는 단순하게 자원을 소비하며 대기하게 되어서 처리량에 도움이 되지 않습니다. 컨슈머가 고장났을 때를 대비하여 액티브/스탠바이 형태로 생각할 수도 있는데 리밸런싱이 동작하면 다른 컨슈머가 고장난 컨슈머의 역할을 대신하므로 굳이 장애 상황을 대비해 컨슈머를 추가할 필요는 없습니다.

컨슈머 옵션

  • bootstrap.servers
    프로듀서의 옵션과 동일하게 호스트:포트 정보로 브로커의 정보를 입력합니다.
  • fetch.min.bytes
    한 번에 가져올 최소 데이터 크기로 데이터의 크기가 이보다 작다면 데이터가 모일때까지 대기합니다. fetch.max.wait.ms로 최대로 기다리는 시간을 설정합니다.
  • group.id
    컨슈머가 속한 컨슈머 그룹의 식별자입니다.
  • heartbeat.interval.ms
    카프카는 허트비트를 통해 컨슈머가 active 상태인지 아닌지 판단합니다. session.timeout.ms 전까지 허트비트를 보내지 않으면 종료된 것으로 판단해 그룹에서 제외됩니다. 일반적으로 session.timeout.ms보다 1/3 수준으로 설정합니다.
  • enable.auto.commit
    백그라운드로 주기적으로 오프셋을 커밋합니다
  • auto.offset.reset
    초기 오프셋이 없거나 오프셋이 더 이상 존재하지 않을 경우 초기화하는 옵션입니다.
    • earlist: 가장 초기의 오프셋 값으로 설정합니다
    • latest: 가장 마지막의 오프셋 값으로 설정합니다
    • none: 이전 오프셋 값을 찾지 못하면 에러를 나타냅니다
  • fetch.max.bytes
    한 번에 최대로 가져올 수 있는 bytes 크기를 의미합니다.
  • max.poll.records
    한 번의 poll() 요청으로 가져오는 최대 메시지 수입니다.

코드 수신방법

오토커밋

컨슈머의 기본 동작으로 가장 많이 사용되는 설정으로 오프셋을 주기적으로 커밋해서 오프셋 관리가 편하지만 컨슈머 종료나 오류가 빈번히 일어나면 메시지를 못 가져오거나 중복되는 현상이 발생하게 됩니다.

동기 가져오기

프로그래머가 poll()로 메시지를 가져오고 처리 이후에 동기적으로 commit을 호출하는 방법입니다. 속도는 느리지만 메시지 손실은 거의 발생하지 않습니다. 메시지 손실이 없어야 하는 중요한 처리일경우 사용하지만 메시지 중복 이슈는 여전히 존재합니다.

비동기 가져오기

프로그래머가 poll()로 메시지를 가져오고 처리 이후에 비동기적으로 commit을 호출하는 방법입니다. 동기 방식과 다르게 커밋에 실패해도 재시도하지 않습니다.

컨슈머 오프셋

컨슈머는 메시지의 위치를 오프셋으로 관리하는데 이 정보는 숫자로 나타나며 토픽에 저장됩니다. 저장되는 토픽은 __consumer_offsets 이며, 컨슈머 그룹이 컨슈머 그룹과 토픽, 파티션 등을 오프셋과 같이 저장하게 됩니다.

그룹 코디네이터

컨슈머는 하나의 컨슈머 그룹에 속하게 되며 그룹 내의 컨슈머들과 정보를 공유하며 동작합니다. 컨슈머는 그룹에서 나갈 수도 있고 새롭게 합류할 수도 있습니다. 컨슈머 그룹은 이러한 변화를 감지하고 작업을 균등하게 분배해야 하는데 이러한 작업을 컨슈머 리밸런싱이라 부릅니다. 이러한 작업을 수행하기 위해 컨슈머 그룹은 그룹 코디네이터가 존재합니다.

그룹 코디네이터는 컨슈머 그룹이 구독한 토픽의 파티션들과 그룹의 멤버들을 트래킹하고, 리밸런싱 동작을 수행합니다. 그룹 코디네이터는 컨슈머 그룹별로 존재하며, 카프카 클러스터 내의 브로커 중 하나에 위치하게 됩니다.

컨슈머 그룹 등록 과정

  1. 컨슈머는 브로커에게 클라이언트와 초기 커넥션을 연결하기 위한 요청 전송
  2. 요청을 받은 브로커는 그룹 코디네이터를 생성하고 컨슈머에게 응답
  3. 그룹 코디네이터는 group.initial.rebalance.delay.ms의 시간 동안 컨슈머의 요청 대기
  4. 컨슈머는 컨슈머 등록 요청을 그룹 코디네이터에 전송. 가장 먼저 전송한 컨슈머가 컨슈머 리더가 됩니다
  5. 컨슈머 등록 요청을 받은 그룹 코디네이터는 해당 컨슈머 그룹이 구독하는 토픽 파티션 리스트 등 리더 컨슈머의 요청에 응답을 보냅니다
  6. 리더 컨슈머는 정해진 컨슈머 파티션 할당 전략에 따라 그룹 내 컨슈머들에게 파티션을 할당한 뒤 그룹 코디네이터에게 전달합니다.
  7. 그룹 코디네이터는 해당 정보를 캐시하고 각 그룹 내 컨슈머들에게 성공을 알립니다
  8. 각 컨슈머들은 각자 지정된 토픽 파티션으로부터 메시지들을 가져옵니다

스태틱 멤버십

하드웨어 업데이트나 점검 등의 이유로 컨슈머 그룹 내의 컨슈머를 순차적으로 재시작하고 싶을 때가 있는데 컨슈머는 leave, join 시마다 리밸런싱이 일어나 컨슈머가 전체 정지할 수 있습니다.

이러한 일을 방지하기 위해 스태틱 멤버십이 추가되었습니다. 컨슈머마다 인식할 수 있는 아이디(group.instance.id)를 적용해서 컨슈머 그룹을 떠나거나 새로 합류할 때마다 리밸런싱이 일어나기 않게 합니다.

컨슈머 파티션 할당 전략

레인지 파티션 할당 전략

파티션 할당 전략의 기본값으로 토픽별로 할당 전략을 사용합니다.
먼저 구독하는 토픽에 대한 파티션을 순서대로 나열한 후 컨슈머를 순서대로 정렬합니다. 그런 다음 각 컨슈머가 몇 개의 파티션을 할당해야 하는지 전체 파티션 수를 컨슈머 수로 나누고 할당합니다. 만약 정확히 나누어떨어지지 않는다면 앞쪽의 컨슈머가 추가로 파티션을 할당받게 됩니다.

레인지 파티션 할당 전략은 동일한 메시지 키를 사용하고 하나의 컨슈머 그룹이 동일한 파티션 수를 가진 2개 이상의 토픽을 컨슘할 때 유용합니다.
토픽 1의 키 abc와 토픽 2의 키 abc가 동일한 파티션에 할당되므로 하나의 컨슈머가 두 토픽의 파티션을 컨슘하게 됩니다.

라운드 로빈 파티션 할당 전략

단순히 토픽별로 파티션을 나열하고 컨슈머마다 나누어가며 할당하게 됩니다.

스티키 파티션 할당 전략

라운드 로빈은 리밸런싱이 일어날 때 파티션과 컨슈머가 이전과 동일하게 매핑되지 않는데, 최대한 원래 매핑을 고수하고 새롭게 매핑되어야할 것들만 리밸런싱하는 방법입니다.

다음과 같은 규칙에 따라 재할당 동작을 수행합니다.

  • 컨슈머들의 최대 할당된 파티션의 수의 차이는 1
  • 기존에 존재하는 파티션 할당은 최대한 유지
  • 재할당 동작 시 유효하지 않은 모든 파티션 할당은 제거
  • 할당되지 않은 파티션들은 균형을 맞추는 방법으로 컨슈머들에 할당

협력적 스티키 파티션 할당 전략

기존의 리밸런싱 동작은 EAGER 리밸런스 프로토콜을 사용했는데 이 프로토콜은 리밸런싱 시 컨슈머에 할당된 모든 파티션을 취소하고 이후에 할당하는 전략을 취했습니다.

협력적 스티키 파티션 할당 전략은 기본적으로 스티키 파티션 할당 전략과 동일하지만, COOPERATIVE 프로토콜을 적용해 여러번에 걸쳐 리밸런싱을 동작해 리밸런싱이 적용되어야 할 파티션의 컨슈머 매핑만 끊고 새롭게 연결하는 방식으로 동작합니다.

실전 카프카 개발부터 운영까지 3장, 6장

profile
공부하는 개발자

0개의 댓글