출처 : https://jyeonth.tistory.com/30
Kafka는 가장 널리 쓰이는 메시지 큐 솔루션 중 하나입니다.
다른 메시지 큐와 마찬가지로, Producer가 메시지를 publish하며 Consumer가 subscribe하며 메시지를 가져가게 됩니다.
다만 이 사이에 Topic / Partition / Consumer Group과 같은 개념이 등장하게 됩니다.
Kafka 에서는 Producer는 topic에 메시지를 보내고, 하나의 Topic은 한 개 이상의 Partition으로 나뉘어지게 됩니다. 이는 topic을 생성하는 시점에 명시할 수 있습니다.
$ /usr/local/kafka/bin/kakfa-topic.sh \
--zookeeper $LIST_OF_ZK_NODES --topic my-topic --partitions 3 --replication-factor 2 --create
Created topic "my-topic".
partition은 Topic을 분할한 단위이며, partition이 여러개일 경우 producer가 보낸 메시지의 순서는 보장될 수 없지만, 각 partition 안에서 메시지는 순서가 보장됩니다.
아래의 예시를 살펴 봅시다.
예를 들어, 특정 유저의 action이 A -> B -> C의 순서로 발생하고, 이를 Kafka Consumer가 반드시 이 순서로만 subscribe 해야 한다면 userId를 partition key로 두어 특정 유저의 action은 모두 같은 partition으로 들어가도록 하여 순서를 보장할 수 있습니다.
이처럼 메시지의 순서가 중요한 경우, partition의 수를 1로 두거나, 순서가 보장되어야 하는 메시지는 같은 partition에 들어가도록 key를 적절히 설정해야 합니다.
기존의 Message Queue 솔루션에서는 컨슈머가 메시지를 가져가면, 해당 메시지는 큐에서 삭제됩니다.
즉, 하나의 큐에 대하여 여러 컨슈머가 붙어서 같은 메시지를 컨슈밍할 수 없습니다.
하지만 Kafka는 컨슈머가 메시지를 가져가도 큐에서 즉시 삭제가 되지 않으며, 하나의 토픽에 여러 컨슈머 그룹이 붙어 메시지를 가져갈 수 있습니다.
또한 각 consumer group마다 해당 topic의 partition에 대한 별도의 offset을 관리하고, group에 컨슈머가 추가/제거 될 때마다 rebalancing을 하여 group 내의 consumer에 partition을 할당하게 됩니다.
이는 컨슈머의 확장/축소를 용이하게 하고, 하나의 MQ를 컨슈머 별로 다른 용도로 사용할 수 있는 확장성을 제공합니다.
특정 topic을 subscribe하는 consumer는 이 중 하나 이상의 partition에 할당하여 메시지를 가져오게 됩니다.
이와 같은 상황을 방지하기 위하여, partition과 consumer 수의 적절한 조절이 필요합니다.
offsets.retention.minutes(default: 1440m = 24h)
에 명시된 시간 동안 저장이 되며, 이 기간이 지나면 consumer offset이 리셋되고, auto.offset.reset
에 명시된 offset으로 초기화됩니다.모든 컨슈머들이 살아있고 제대로 메시지를 polling해가고 있는 동안은 아무런 문제가 없지만, 컨슈머 인스턴스 중 하나가 다운되거나 컨슈머 그룹에 새로운 컨슈머가 들어오게 된다면 컨슈머 그룹 내에서 rebalance가 일어나게 됩니다.
이 때 각각의 컨슈머는 이전에 처리했던 토픽의 파티션이 아닌, 다른 파티션에 할당되며, 새로 할당된 파티션에 대해 가장 최근에 커밋된 오프셋을 읽어 그 다음부터 메시지를 가져오기 시작합니다.
각 Consumer는 자신이 살아있다는 것(Partition에 대한 소유권을 주장)을 그룹 코디네이터에게 알려주기 위해 heartbeat.interval.ms(default: 3s)
주기로 heartbeat를 보냅니다.
만약 session.timeout.ms(default: 10s)
에 설정된 시간이 지나도록 특정 consumer가 heartbeat를 보내지 않으면, 해당 컨슈머는 종료되었거나 무언가 문제가 생긴 것으로 인지하고 컨슈머 그룹은 reblance를 시도하게 됩니다.
session.timeout.ms
가 짧은 경우 장애를 빨리 인지할 수 있지만, 너무 짧게 설정된 경우 Full GC가 오래걸리는 경우 등의 이유로 원치 않은 rebalance가 일어날 수도 있습니다.
또한, consumer가 heartbaet만 보내고 실제로 topic에서 메시지를 가져가지 않는 경우가 생길수도 있습니다.
이 경우, 코디네이터는 해당 consumer를 정상 상태로 인지하기 때문에 계속 partition을 점유하고 있지만, 해당 partition에서 메시지는 하나도 consuming이 되지 못하는 상태가 됩니다.
이를 방지하기 위해, max.poll.interval.ms(default: 300000ms)
를 설정합니다.
컨슈머가 heartbeat를 보내더라도, max.poll.interval.ms
내에 poll()을 호출하지 않으면 해당 컨슈머를 그룹에서 제외하고 다른 컨슈머에서 해당 파티션에서 메시지를 컨슈밍 할 수 있도록 rebalance를 합니다.
많은 Kafka Library들은 실패한 메시지에 대해 retry 기능을 지원합니다. 이 retry 횟수가 많고 간격이 긴 경우, 해당 메시지를 retry하다가 다음 메시지에 대한 poll()을 호출하지 못하게 된다.
이러한 상황에서 max.poll.interval.ms
만큼의 시간이 지나면 consumer는 rebalancing 되어 다른 consumer가 해당 partition을 처리하게 됩니다.
Consumer commit은 자동 커밋 / 수동 커밋 두 가지 방법으로 관리할 수 있다.
자동 커밋을 사용하려면 컨슈머 옵션 중 enable.auto.commit
을 true로 설정해주어야 합니다.
이 경우, 컨슈머에서 poll()을 호출할 때 auto.commit.interval.ms(default: 5s)
이 지났는지를 확인하고, 커밋할 때가 되었으면 가장 마지막 offset을 commit 합니다.
다만 주의해야 할 점은 중복 컨슈밍이나 메시지 손실이 발생할 수 있다는 것입니다. 예를 들어 auto.commit.interval.ms
이 5초이고, 이 5초가 지나기 전에 consumer group rebalancing이 일어난 경우를 생각해봅시다.
auto.commit.interval.ms
를 줄여서 이런 상황이 발생할 확률을 낮출 수는 있겠지만, 해당 가능성은 항상 존재합니다.auto.commit.interval.ms
가 지났으면 마지막 offset을 commit하게 됩니다. 이 경우, commit은 되어 버렸지만 아직 메시지의 처리가 끝나지 못한 상태에서 컨슈머에 장애가 발생하면 해당 메시지는 손실될 수 있습니다.수동 커밋은 메시지 처리가 완전히 완료되기 전까지 메시지를 가져온 것으로 간주되면 안되는 경우 사용합니다.
자동 커밋에서 설명한 메시지 손실을 방지하기 위해, 메시지의 처리가 완전히 끝났다고 마킹할 수 있는 시점에 해당 offset을 커밋할 수 있습니다.
하지만 수동 커밋의 경우에도, 작업을 처리하다가 에러가 나는 경우 중복이 발생할 수 있습니다.