Kafka 3. Consumer, Producer
Consumer
Consumer groups and partition rebalance
- partition을 consumer들에게 재분배 하는 것을 rebalance라고 한다.
- rebalance는 두 가지 경우에 발생할 수 있다.
- consumer가 consumer group에서 합류/이탈하는 경우
- topic의 partition이 추가되는 경우
- reblance 전략은 크게 두 부류로 나뉜다.
- eager rebalance
- cooperative rebalance
- Kafka Consumer의 partition.assignment.strategy로 설정가능
Eager rebalance
- 짧은 시간동안 Consumer들이 동작을 멈추고, 동시에 partition에 대한 membership을 잃는다.
- 이것을 Stop world event라고 부른다.
- 정해진 전략에 따라 새로운 Partition을 할당받는다.
- Eager reblance를 취하면 두 가지 문제가 발생할 수 있다.
- 재할당이 일어나게 되면 partition이 consumer에게 무작위로 재할당 될 가능성이 있다.
- Consumer가 원래 소유하고 있던 partition에 대해서 우선권을 가지는 방식을 고려해볼 수 있다.
- Consumer group에 속한 Consumer들의 Stop world event는 비효율을 초래한다.
- 재할당 이전과 이후에 동일한 consumer에 할당되는 Partition에 대해선 재할당 과정을 생략하는 것을 고려해볼 수 있다.
- Eager rebalance의 종류
- RangeAssignor : topic 단위로 할당하기 때문에 균형을 맞추기 어렵다.
- RoundRobin : 모든 topic에 걸쳐서 할당하기 때문에 RangeAssignor에 비해 균형적이다.
- StickyAssignor : 처음에는 RoundRobin방식으로 할당하고 재할당이 일어날땐 파티션의 이동을 최소화하는 방식을 사용
Cooperative rebalance
- Incremental rebalance라고도 불림
- Eager rebalance의 문제점을 개선한 전략
- patition중 일부만 때어서 rebalance하는 방식
- 일부 partition을 제외한 재할당되지 않은 partition에 대해선 계속 작동됨
- 이 iteration을 반복하다보면 stable 상태로
그래서 협력적 리밸런싱이라고
- Cooperative rebalance의 종류
- CooperativeStickyAssignor
- StickyAssignor의 방식에 Cooperative rebalance방식을 적용
- 즉 재할당이 일어나지 않은 Partition에 대해선 계속 동작
how to use?
- Kafka Consumer
- Kafka 3.0의 default assignor는 [RangeAssignor,CooperativeStickyAssignor]
- 기본값으로 RangeAssignor만 쓰다가 RangeAssignor를 제거하면 CooperativeStickyAssignor를 사용
- Kafka Connect : Cooperative rebalance가 기본값
- Kafka Stream : StreamsPartitionAssignor를 기본값으로 사용
Static Group Membership
- consumer가 이탈해도 partition을 일정시간동안 다른 consumer에 할당하지 않는 방법
- consumerd의 group.instance.id를 설정하면 Consumer가 나가도 session.timeout.ms 만큼 다른 consumer에게 할당되지 않음
- Static Group Membership을 사용하면 단순히 consumer를 재시작하고 싶을때 불필요한 rebalance을 방지할 수 있음
- Kubernetes와 사용하면 유용
- consumer가 local state, cache를 유지해야 한다면 유용
- cache를 rebuilding해야하는 상황을 피할 수 있음
Consumer offset commit
- Java Consumer API는 at-least once senario가 default
- enable.auto.commit=True
- auto.commit.interval.ms=5000
- commit 요청을 보낸 직후의 poll 혹은 최초의 poll이 완료된 후 timer를 작동
- 5초가 지난 후 진행중인 poll이 완료되면 commitAsync 수행
- enable.auto.commit=False로 설정하면 명시적으로 commitSync, commitAsync를 호출해줘야함
- commitSync를 사용하면 exactly once를 구현할 수 있음
heartbeat & poll
- broker는 consumer의 down여부를 확인하기 위해 heartbeat 매커니즘과 poll 매커니즘을 활용한다.
- heartbeat
- 모든 Consumer는 자신의 상태를 알리기위해 주기적으로 coordinator에게 heartbeat를 쏜다.
- heartbeat.interval.ms (default 3000)
- heartbeat를 보내는 주기
- 보통 session.timeout.ms의 1/3로 설정
- session.timeout.ms (default Kafka >= 3.0 ? 45s:10s)
- session.timeout.ms 동안 heartbeat를 받지 못하면 해당 Consumer가 죽은것으로 간주
- 빠른 rebalance를 위해서 낮은 값으로 설정할 수 있다.
- poll
- max.poll.interval.ms (default 5m)
- consumer는 살아있는데 poll 요청이 해당 시간동안 오지 않으면 문제가 있다고 간주
- data processing에 문제가 있는지 판단하기 위해 사용
- max.poll.records (default 500)
- 한번의 Poll 요청으로 받을 수 있는 최대 record수
- message의 크기가 작고 RAM 여유가 있다면 increase, 반대면 decrease
- fetch.min.bytes (default 1)
- 한번의 요청으로 poll해오는 데이터 크기의 하한
- fetch.min.bytes ↑ throutput ↑ requestNum ↓ latency ↑
- fetch.max.wait.ms (default 500)
- fetch.min.bytes가 충족되지 않을때 최대 대기시간
- max.partition.fetch.bytes (default 1MB)
- 하나의 partition이 return할 수 있는 최대 데이터 크기
- fetch.max.bytes (default 55MB)
- 각 fetch request의 최대 데이터 크기
Producer
Producer acknowlegements
- Producer는 acks를 통해 data를 보내는 방식을 정할 수 있다.
- akcs=0 : acks를 기다리지 않음 (데이터 손실 가능성 가장 높음)
- 데이터 손실이 용인될 수 있는 상황 & 높은 처리량이 중요할때
- acks=1 : leader의 acks를 기다림 (데이터 손실 가능성이 존재)
- acks=0에 비해 데이터 손실이 적고 처리량이 낮음
- Kafka 2.8 까지는 acks=1이 default
- acks=-1|all : leader와 replicas의 acks를 모두 기다림 (데이터 손실이 없음)
- 여기서 all은 ISR(in-sync replicas)를 의미
- ISR은 leader와 살아있는 replicas
- Kafka 3.0 부터는 all이 default
Producer acks=all & min.insync.replicas
- min.insync.replicas로 최소한 살아있어야하는 replica의 수를 지정가능
- min.insync.replicas의 default 값은 1
- 가장 많이 사용되는? 설정은 acks=all, RF=3, min.insync.replica=2
- min.insync.replicas=1 로 설정하면 leader만 존재해도 되므로 안정성이 떨어진다.
- min.insync.replicas=3 으로 설정하면 하나만 죽어도 refuse되므로 availability가 떨어짐
- broker의 수=N, min.insync.replica=M 인 경우 broker가 N-M개 까지는 죽어도 됨
- min.insync.replicas는 broker에 설정하는 값임
Producer retries
- data가 broker로 잘 전달되지 않은 경우 retry를 해야한다.
- Ex NOT_ENOUGH_REPLICAS(min.insync.replicas)
- retries
- retry 최대 횟수
- Kafka <= 2.0 default 0
- Kafka > 2.0 default 2147483647(2^31-1)
- retries.backoff.ms
- 다음 retry를 개시할때까지 대기시간
- default 100ms
- request.timeout.ms
- send하고 acks를 받는 제한시간
- default 30000
- delivery.timeout.ms
- send() 명령이 발생하고 난 후 success report를 받을 수 있는 제한시간의 상한
- request.timeout.ms와 linger.ms의 합보다 크거나 같아야한다.
- default 120000
Idempotent Producer
- Idempotent Producer는 동일한 데이터에 대해 쓰기요청이 반복적으로 일어날때 쓰기요청이 한번만 일어난 것과 동일한 결과를 얻는 것을 말함
- 만약 Idempotent producer를 사용하지 않는다면
- 데이터의 중복 저장이 발생할 수 있음
- key-based ordering을 사용할 경우 순서가 뒤죽박죽될 수 있음
- max.in.flight.requests.per.connection 을 1로 설정하면 해결가능하지만 처리량이 줄어듦
- max.in.flight.requests.per.connection : 한번에 병렬처리 되는 request의 수
- default 5
- Kafka >= 1.0에서는 idempotent producer를 쓰는 것을 권장
- Kafka >= 3.0에서는 default
- idempotent producer를 사용하면
- Kafka broker가 데이터에 대한 중복 체크가 가능해짐
- max.in.flight.requests.per.connection의 값을 1보다 큰 값을 설정해도 메시지의 순서 보장
- ProducerProps.put("enable.idempotence", true) 로 설정 가능
Message Compression
- Compression 은 Production, Broker에서 일아날 수 있다.
- Compression.type [none(default), gzip, lz4, snappy, zstd] 가 있다.(Kafka 2.1)
- batch size가 클수록 compression 효율이 올라간다.
- 압축으로 얻을 수 있는 이점
- 메시지의 크기가 줄어듬(보통 1/4)
- 전송속도가 빨라짐(=Low latency)
- 높은 처리량
- 효율적인 디스크 사용
- 단점(미미함)
- Producer : Compression을 위해 CPU cycles을 사용해야함
- Consuemr : Decompression을 위해 CPU cycles을 사용해야함
- 보통 sanppy, lz4를 먼저 사용해볼 것을 권장
- linger.ms와 batch.size를 키워 압축률, 처리량을 높일 수 있음
- 스트림의 처리량이 많을 때 반드시 Producer에서 압축을 권장
- broker, topic level에서도 압축을 할 수 있음
- topic : 해당 토픽에만 적용
- broker
- 모든 토픽에 적용
- compression.type=producer : Producer에게 압축을 전적으로 맡김(권장)
- compression.type=none : decompression을 수행
- compression.type=lz4 : 만약 producer의 compression.type이 snappy라면 decompression 한 후 lz4로 압축함
linger.ms & batch.size
- max.in.flight.requests.per.connection 은 flight 상태인 메시지의 상한이다.
- max.in.flight.requests.per.connection 으로 인해 message를 보낼 수 없을때 Kafka는 다음에 전송될 batch를 미리 구성한다.
- batching mechanism에 영향을 주는 요소
- batch.size
- batch size의 상한
- batch size 보다 큰 메시지는 바로 전송됨
- 너무 큰 batch size를 설정하면 out of memory가 발생
- Kafka Producer Metric을 모니터링해 평균 batch size를 모니터링 할 수 있다.
- linger.ms(default 0)
- batch에 더 많은 메시지를 넣기위해 설정한 latency
- 약간의 latency의 대가로 batch 의 크기를 늘릴 수 있어 더 효율적인 압축이 가능
Kafka Consumer Replica Fetching
- 원래 consumer는 각 partition의 leader로 부터 데이터를 받는다.
- consumer와 leader를 저장하고 있는 broker사이의 물리적인 거리가 크다면 latency, cost측면에서 비효율적
- 따라서 Kafka >= 2.4 부터 consumer가 replica set에서 가장 가까운 것을 읽을 수 있게됨
- broker setting
- replica.selector.class를 org.apahce.kafka.common.replica.RackAwareReplicaSelector로 설정
- rack.id config를 data center ID(ex. AZ ID in AWS)로 설정
- Consumer client setting
- client.rack을 data center ID로 설정
Partitioner
key != null
- key 가 null 이 아닐때 레코드는 partitioner 로직에 의해서 어떤 Partition에 할당될지 결정됨
- 이 과정을 key hashing이라고함
- default partitioner는 murmur2 hash 함수를 사용
- targetPartition = Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)
- key 값이 동일하면 같은 Partition에 할당되는 것을 보장, 하지만 topic에 partition을 추가하게 되면 이것을 보장하지 못함
- 그래서 topic에 partition을 추가하는 것 보다 새로운 topic을 생성하는 것을 권장
- default partitioner logic을 오버라이드 하는 것은 권장되지 않지만 특수한 경우에 오버라이드 해야한다면 partitioner.class를 사용해 변경할 수 있음
key == null
- key 가 null일때 Kafka version에 따라 default partitioner가 다르게 동작한다.
- RoundRobin : Kafka <= 2.3
- 메시지 단위 roundrobin 방식으로 할당
- Sticky Partitioner : Kafka > 2.3
- 배치 단위 roundrobin 방식으로 할당
- 처리량, latency가 줄어듬
- batch.size, linger.ms를 활용하는 방식
max.block.ms & buffer.memory
- producer가 message를 너무 빠른 속도로 생산하여 broker가 이를 따라가지 못하면 producer의 메모리에 records가 buffered된다.
- buffer.memory(default 32MB)는 buffer의 크기를 지정
- 만약 buffer가 모두 차게되면 producer는 .send()를 중단한다.
- max.block.ms(default 60000)동안 buffer가 꽉찬 상태로 유지되면 throw exception
- exception이 발생했다는 것은 보통 broker가 down or overload 되었다는 것을 의미