Kafka 기본개념 정리

솔트커피·2022년 8월 14일
0

출처 : https://jyeonth.tistory.com/30

개요

Kafka는 가장 널리 쓰이는 메시지 큐 솔루션 중 하나입니다.

다른 메시지 큐와 마찬가지로, Producer가 메시지를 publish하며 Consumer가 subscribe하며 메시지를 가져가게 됩니다.

다만 이 사이에 Topic / Partition / Consumer Group과 같은 개념이 등장하게 됩니다.

Message Order within Topic / Partition

Kafka 에서는 Producer는 topic에 메시지를 보내고, 하나의 Topic은 한 개 이상의 Partition으로 나뉘어지게 됩니다. 이는 topic을 생성하는 시점에 명시할 수 있습니다.

$ /usr/local/kafka/bin/kakfa-topic.sh \ 
--zookeeper $LIST_OF_ZK_NODES --topic my-topic --partitions 3 --replication-factor 2 --create
Created topic "my-topic".

partition은 Topic을 분할한 단위이며, partition이 여러개일 경우 producer가 보낸 메시지의 순서는 보장될 수 없지만, 각 partition 안에서 메시지는 순서가 보장됩니다.

아래의 예시를 살펴 봅시다.

  • 위 그림에서, Producer는 1부터 9까지의 순서로 메시지를 publish 했습니다.
  • Consumer는 각 partition마다 가지고 있는 offset(latest : 마지막으로 읽어들인 record 기록)부터 차례로 메시지를 읽어옵니다.
  • 하지만 하나의 Consumer가 이 세 개의 partition을 모두 subscribe한 경우, 높은 확률로 메시지는 원래 전송된 순서로 오지 않습니다. Consumer가 세 개의 partition을 동시에 바라보기 때문입니다.
  • 각 partition에서 한 개씩 돌아가면서 메시지를 가져온다고 해도 1 -> 3 -> 4-> 2-> 5 -> 6 -> 7 -> 9 -> 8의 순서로 메시지가 도착하게 될 것입니다.
  • 하지만, 하나의 partition 내에서는 메시지의 순서가 보장됩니다. 즉, partition 1에서의 1, 2, 7, 8 / partition 2에서의 3, 5 / partition 3에서의 4, 6, 9는 반드시 이 순서대로 도착하게 됩니다.
  • 메시지는 key에 따라 각자 다른 partition으로 assign 되도록 설정할 수 있으므로, 이는 메시지의 순서가 중요한 경우 유용하게 사용될 수 있습니다.

예를 들어, 특정 유저의 action이 A -> B -> C의 순서로 발생하고, 이를 Kafka Consumer가 반드시 이 순서로만 subscribe 해야 한다면 userId를 partition key로 두어 특정 유저의 action은 모두 같은 partition으로 들어가도록 하여 순서를 보장할 수 있습니다.

이처럼 메시지의 순서가 중요한 경우, partition의 수를 1로 두거나, 순서가 보장되어야 하는 메시지는 같은 partition에 들어가도록 key를 적절히 설정해야 합니다.

Consumer Group

기존의 Message Queue 솔루션에서는 컨슈머가 메시지를 가져가면, 해당 메시지는 큐에서 삭제됩니다.
즉, 하나의 큐에 대하여 여러 컨슈머가 붙어서 같은 메시지를 컨슈밍할 수 없습니다.
하지만 Kafka는 컨슈머가 메시지를 가져가도 큐에서 즉시 삭제가 되지 않으며, 하나의 토픽에 여러 컨슈머 그룹이 붙어 메시지를 가져갈 수 있습니다.

또한 각 consumer group마다 해당 topic의 partition에 대한 별도의 offset을 관리하고, group에 컨슈머가 추가/제거 될 때마다 rebalancing을 하여 group 내의 consumer에 partition을 할당하게 됩니다.

이는 컨슈머의 확장/축소를 용이하게 하고, 하나의 MQ를 컨슈머 별로 다른 용도로 사용할 수 있는 확장성을 제공합니다.

  • 위 그림의 경우 하나의 토픽에 jyeon이라는 consumer group과 kim이라는 consumer group 두 개가 붙었으며, 파란색 화살표는 jyeon 그룹의 오프셋, 빨간색 화살표는 kim 그룹의 오프셋입니다.
  • 즉, 같은 topic에 대해 consume해도 jyeon 그룹은 각 partition에서 7, 5, 9를 읽어들일 것이고, kim 그룹은 2, 5, 6을 읽어들일 것 입니다.
  • 두 그룹이 메시지를 모두 읽어간 이후에도 메시지는 사라지지 않은 상태로 얼마간 디스크에 기록된 상태로 남아있으므로, 다른 consumer group이 topic을 처음부터 읽어갈 수도 있고, kim 그룹의 offset을 리셋시켜 맨 처음 / 중간 원하는 offset부터 읽어들일 수도 있습니다.

Partition - Consumer Assignment

특정 topic을 subscribe하는 consumer는 이 중 하나 이상의 partition에 할당하여 메시지를 가져오게 됩니다.

  • 단, 하나의 partition에는 동일한 Consumer Group에서 반드시 하나의 Consumer만 할당이 됩니다. (Partition : Consumer = N : 1 관계)
  • 반대로, Topic의 partition은 두 개이지만, Consumer는 세 개인 경우, 하나의 Consumer는 assign된 partition 없이 아무 일도 하지 않고 대기만 하게 됩니다.

이와 같은 상황을 방지하기 위하여, partition과 consumer 수의 적절한 조절이 필요합니다.

Commit / Offset Management

  • Cosumer가 poll()을 호출할 때마다, 컨슈머 그룹은 카프카에 저장되어 있는 아직 읽어오지 않은 메시지를 가져옵니다.
  • 각 topic의 partition마다 어디까지 읽었는지의 정보는 offset이라는 이름으로 broker에 기록이 됩니다. (offset 정보는 0.8.x까지는 zookeeper에서 관리되었고, 0.9부터는 broker에 저장됩니다.)
    이 정보는 offsets.retention.minutes(default: 1440m = 24h)에 명시된 시간 동안 저장이 되며, 이 기간이 지나면 consumer offset이 리셋되고, auto.offset.reset에 명시된 offset으로 초기화됩니다.

모든 컨슈머들이 살아있고 제대로 메시지를 polling해가고 있는 동안은 아무런 문제가 없지만, 컨슈머 인스턴스 중 하나가 다운되거나 컨슈머 그룹에 새로운 컨슈머가 들어오게 된다면 컨슈머 그룹 내에서 rebalance가 일어나게 됩니다.

이 때 각각의 컨슈머는 이전에 처리했던 토픽의 파티션이 아닌, 다른 파티션에 할당되며, 새로 할당된 파티션에 대해 가장 최근에 커밋된 오프셋을 읽어 그 다음부터 메시지를 가져오기 시작합니다.

Consumer Health Check & Rebalancing

각 Consumer는 자신이 살아있다는 것(Partition에 대한 소유권을 주장)을 그룹 코디네이터에게 알려주기 위해 heartbeat.interval.ms(default: 3s)주기로 heartbeat를 보냅니다.

만약 session.timeout.ms(default: 10s)에 설정된 시간이 지나도록 특정 consumer가 heartbeat를 보내지 않으면, 해당 컨슈머는 종료되었거나 무언가 문제가 생긴 것으로 인지하고 컨슈머 그룹은 reblance를 시도하게 됩니다.

session.timeout.ms가 짧은 경우 장애를 빨리 인지할 수 있지만, 너무 짧게 설정된 경우 Full GC가 오래걸리는 경우 등의 이유로 원치 않은 rebalance가 일어날 수도 있습니다.

또한, consumer가 heartbaet만 보내고 실제로 topic에서 메시지를 가져가지 않는 경우가 생길수도 있습니다.
이 경우, 코디네이터는 해당 consumer를 정상 상태로 인지하기 때문에 계속 partition을 점유하고 있지만, 해당 partition에서 메시지는 하나도 consuming이 되지 못하는 상태가 됩니다.
이를 방지하기 위해, max.poll.interval.ms(default: 300000ms)를 설정합니다.

컨슈머가 heartbeat를 보내더라도, max.poll.interval.ms내에 poll()을 호출하지 않으면 해당 컨슈머를 그룹에서 제외하고 다른 컨슈머에서 해당 파티션에서 메시지를 컨슈밍 할 수 있도록 rebalance를 합니다.

많은 Kafka Library들은 실패한 메시지에 대해 retry 기능을 지원합니다. 이 retry 횟수가 많고 간격이 긴 경우, 해당 메시지를 retry하다가 다음 메시지에 대한 poll()을 호출하지 못하게 된다.

이러한 상황에서 max.poll.interval.ms만큼의 시간이 지나면 consumer는 rebalancing 되어 다른 consumer가 해당 partition을 처리하게 됩니다.

How Consumer Offset is Managed

Consumer commit은 자동 커밋 / 수동 커밋 두 가지 방법으로 관리할 수 있다.

자동 커밋

자동 커밋을 사용하려면 컨슈머 옵션 중 enable.auto.commit을 true로 설정해주어야 합니다.
이 경우, 컨슈머에서 poll()을 호출할 때 auto.commit.interval.ms(default: 5s)이 지났는지를 확인하고, 커밋할 때가 되었으면 가장 마지막 offset을 commit 합니다.

다만 주의해야 할 점은 중복 컨슈밍이나 메시지 손실이 발생할 수 있다는 것입니다. 예를 들어 auto.commit.interval.ms이 5초이고, 이 5초가 지나기 전에 consumer group rebalancing이 일어난 경우를 생각해봅시다.

  • 위 그림에서 커밋된 offset은 2초이고, 8번 offset을 처리하는 도중 consumer session timeout이 발생하던지 consumer가 추가 / 제거 되는 등의 이유로 rebalance가 되었다고 가정합시다.
  • 7번 offset은 이미 처리가 완료되었지만, offset 2번 이후로 offset commit이 되지 않았으므로, rebalance 후 해당 partition에 할당된 consumer가 consume을 시작할 때에는 7번부터 다시 consume하게 됩니다.
  • 즉, 중복 consuming이 발생할 가능성이 있다는 것입니다. auto.commit.interval.ms를 줄여서 이런 상황이 발생할 확률을 낮출 수는 있겠지만, 해당 가능성은 항상 존재합니다.
  • 또한 자동 커밋의 경우, poll()을 호출하는 시점에 auto.commit.interval.ms가 지났으면 마지막 offset을 commit하게 됩니다. 이 경우, commit은 되어 버렸지만 아직 메시지의 처리가 끝나지 못한 상태에서 컨슈머에 장애가 발생하면 해당 메시지는 손실될 수 있습니다.

수동 커밋

수동 커밋은 메시지 처리가 완전히 완료되기 전까지 메시지를 가져온 것으로 간주되면 안되는 경우 사용합니다.
자동 커밋에서 설명한 메시지 손실을 방지하기 위해, 메시지의 처리가 완전히 끝났다고 마킹할 수 있는 시점에 해당 offset을 커밋할 수 있습니다.

하지만 수동 커밋의 경우에도, 작업을 처리하다가 에러가 나는 경우 중복이 발생할 수 있습니다.

  • 예를 들어 메시지 처리 과정이 A -> B -> C -> commit일 때 B에서 장애가 난 경우, 해당 메시지는 commit이 되지 않아 재처리가 될 것이기 때문에 A 과정은 중복처리가 될 수 있습니다.
  • 이와 같은 특성을 고려하여 각 operation을 최대한 idempotent(멱등)하게 동작할 수 있도록 고려하여 설계해야 합니다.
  • 또 한가지 주의할 점은, consumer 측에서 retry로 처리할 수 없는 메시지가 들어온 경우(ex. parsing이 불가능한 메시지) 해당 메시지는 절대로 처리될 수 없고, 이 offset이 영원히 커밋되지 못하기 때문에 다음 메시지들이 처리되지 못하고 offset이 잘못된 메시지에 막혀있는 상황이 발생할 수도 있습니다.
    이런 경우 처리할 수 없는 메시지를 다른 토픽으로 보내는 DLQ 등을 고려하여 설계해야 합니다.

0개의 댓글