Kafka 3. Consumer, Producer

skh951225·2023년 5월 3일
0

Kafka

목록 보기
3/4

Consumer

Consumer groups and partition rebalance

  • partition을 consumer들에게 재분배 하는 것을 rebalance라고 한다.
  • rebalance는 두 가지 경우에 발생할 수 있다.
    • consumer가 consumer group에서 합류/이탈하는 경우
    • topic의 partition이 추가되는 경우
  • reblance 전략은 크게 두 부류로 나뉜다.
    • eager rebalance
    • cooperative rebalance
  • Kafka Consumer의 partition.assignment.strategy로 설정가능

Eager rebalance

  • 짧은 시간동안 Consumer들이 동작을 멈추고, 동시에 partition에 대한 membership을 잃는다.
    • 이것을 Stop world event라고 부른다.
  • 정해진 전략에 따라 새로운 Partition을 할당받는다.
  • Eager reblance를 취하면 두 가지 문제가 발생할 수 있다.
    • 재할당이 일어나게 되면 partition이 consumer에게 무작위로 재할당 될 가능성이 있다.
      • Consumer가 원래 소유하고 있던 partition에 대해서 우선권을 가지는 방식을 고려해볼 수 있다.
    • Consumer group에 속한 Consumer들의 Stop world event는 비효율을 초래한다.
      • 재할당 이전과 이후에 동일한 consumer에 할당되는 Partition에 대해선 재할당 과정을 생략하는 것을 고려해볼 수 있다.
  • Eager rebalance의 종류
    • RangeAssignor : topic 단위로 할당하기 때문에 균형을 맞추기 어렵다.
    • RoundRobin : 모든 topic에 걸쳐서 할당하기 때문에 RangeAssignor에 비해 균형적이다.
    • StickyAssignor : 처음에는 RoundRobin방식으로 할당하고 재할당이 일어날땐 파티션의 이동을 최소화하는 방식을 사용

Cooperative rebalance

  • Incremental rebalance라고도 불림
  • Eager rebalance의 문제점을 개선한 전략
  • patition중 일부만 때어서 rebalance하는 방식
  • 일부 partition을 제외한 재할당되지 않은 partition에 대해선 계속 작동됨
  • 이 iteration을 반복하다보면 stable 상태로
    그래서 협력적 리밸런싱이라고
  • Cooperative rebalance의 종류
    • CooperativeStickyAssignor
      • StickyAssignor의 방식에 Cooperative rebalance방식을 적용
      • 즉 재할당이 일어나지 않은 Partition에 대해선 계속 동작

how to use?

  • Kafka Consumer
    • Kafka 3.0의 default assignor는 [RangeAssignor,CooperativeStickyAssignor]
    • 기본값으로 RangeAssignor만 쓰다가 RangeAssignor를 제거하면 CooperativeStickyAssignor를 사용
  • Kafka Connect : Cooperative rebalance가 기본값
  • Kafka Stream : StreamsPartitionAssignor를 기본값으로 사용

Static Group Membership

  • consumer가 이탈해도 partition을 일정시간동안 다른 consumer에 할당하지 않는 방법
  • consumerd의 group.instance.id를 설정하면 Consumer가 나가도 session.timeout.ms 만큼 다른 consumer에게 할당되지 않음
  • Static Group Membership을 사용하면 단순히 consumer를 재시작하고 싶을때 불필요한 rebalance을 방지할 수 있음
  • Kubernetes와 사용하면 유용
  • consumer가 local state, cache를 유지해야 한다면 유용
    • cache를 rebuilding해야하는 상황을 피할 수 있음

Consumer offset commit

  • Java Consumer API는 at-least once senario가 default
    • enable.auto.commit=True
      • auto.commit.interval.ms=5000
      • commit 요청을 보낸 직후의 poll 혹은 최초의 poll이 완료된 후 timer를 작동
      • 5초가 지난 후 진행중인 poll이 완료되면 commitAsync 수행
  • enable.auto.commit=False로 설정하면 명시적으로 commitSync, commitAsync를 호출해줘야함
    • commitSync를 사용하면 exactly once를 구현할 수 있음

heartbeat & poll

  • broker는 consumer의 down여부를 확인하기 위해 heartbeat 매커니즘과 poll 매커니즘을 활용한다.
  • heartbeat
    • 모든 Consumer는 자신의 상태를 알리기위해 주기적으로 coordinator에게 heartbeat를 쏜다.
    • heartbeat.interval.ms (default 3000)
      • heartbeat를 보내는 주기
      • 보통 session.timeout.ms의 1/3로 설정
    • session.timeout.ms (default Kafka >= 3.0 ? 45s:10s)
      • session.timeout.ms 동안 heartbeat를 받지 못하면 해당 Consumer가 죽은것으로 간주
      • 빠른 rebalance를 위해서 낮은 값으로 설정할 수 있다.
  • poll
    • max.poll.interval.ms (default 5m)
      • consumer는 살아있는데 poll 요청이 해당 시간동안 오지 않으면 문제가 있다고 간주
      • data processing에 문제가 있는지 판단하기 위해 사용
    • max.poll.records (default 500)
      • 한번의 Poll 요청으로 받을 수 있는 최대 record수
      • message의 크기가 작고 RAM 여유가 있다면 increase, 반대면 decrease
    • fetch.min.bytes (default 1)
      • 한번의 요청으로 poll해오는 데이터 크기의 하한
      • fetch.min.bytes ↑ throutput ↑ requestNum ↓ latency ↑
    • fetch.max.wait.ms (default 500)
      • fetch.min.bytes가 충족되지 않을때 최대 대기시간
    • max.partition.fetch.bytes (default 1MB)
      • 하나의 partition이 return할 수 있는 최대 데이터 크기
    • fetch.max.bytes (default 55MB)
      • 각 fetch request의 최대 데이터 크기

Producer

Producer acknowlegements

  • Producer는 acks를 통해 data를 보내는 방식을 정할 수 있다.
    • akcs=0 : acks를 기다리지 않음 (데이터 손실 가능성 가장 높음)
      • 데이터 손실이 용인될 수 있는 상황 & 높은 처리량이 중요할때
    • acks=1 : leader의 acks를 기다림 (데이터 손실 가능성이 존재)
      • acks=0에 비해 데이터 손실이 적고 처리량이 낮음
      • Kafka 2.8 까지는 acks=1이 default
    • acks=-1|all : leader와 replicas의 acks를 모두 기다림 (데이터 손실이 없음)
      • 여기서 all은 ISR(in-sync replicas)를 의미
      • ISR은 leader와 살아있는 replicas
      • Kafka 3.0 부터는 all이 default

Producer acks=all & min.insync.replicas

  • min.insync.replicas로 최소한 살아있어야하는 replica의 수를 지정가능
  • min.insync.replicas의 default 값은 1
  • 가장 많이 사용되는? 설정은 acks=all, RF=3, min.insync.replica=2
    • min.insync.replicas=1 로 설정하면 leader만 존재해도 되므로 안정성이 떨어진다.
    • min.insync.replicas=3 으로 설정하면 하나만 죽어도 refuse되므로 availability가 떨어짐
  • broker의 수=N, min.insync.replica=M 인 경우 broker가 N-M개 까지는 죽어도 됨
  • min.insync.replicas는 broker에 설정하는 값임

Producer retries

  • data가 broker로 잘 전달되지 않은 경우 retry를 해야한다.
    • Ex NOT_ENOUGH_REPLICAS(min.insync.replicas)
  • retries
    • retry 최대 횟수
    • Kafka <= 2.0 default 0
    • Kafka > 2.0 default 2147483647(2^31-1)
  • retries.backoff.ms
    • 다음 retry를 개시할때까지 대기시간
    • default 100ms
  • request.timeout.ms
    • send하고 acks를 받는 제한시간
    • default 30000
  • delivery.timeout.ms
    • send() 명령이 발생하고 난 후 success report를 받을 수 있는 제한시간의 상한
    • request.timeout.ms와 linger.ms의 합보다 크거나 같아야한다.
    • default 120000

Idempotent Producer

  • Idempotent Producer는 동일한 데이터에 대해 쓰기요청이 반복적으로 일어날때 쓰기요청이 한번만 일어난 것과 동일한 결과를 얻는 것을 말함
  • 만약 Idempotent producer를 사용하지 않는다면
    • 데이터의 중복 저장이 발생할 수 있음
    • key-based ordering을 사용할 경우 순서가 뒤죽박죽될 수 있음
      • max.in.flight.requests.per.connection 을 1로 설정하면 해결가능하지만 처리량이 줄어듦
      • max.in.flight.requests.per.connection : 한번에 병렬처리 되는 request의 수
      • default 5
  • Kafka >= 1.0에서는 idempotent producer를 쓰는 것을 권장
  • Kafka >= 3.0에서는 default
  • idempotent producer를 사용하면
    • Kafka broker가 데이터에 대한 중복 체크가 가능해짐
    • max.in.flight.requests.per.connection의 값을 1보다 큰 값을 설정해도 메시지의 순서 보장
  • ProducerProps.put("enable.idempotence", true) 로 설정 가능

Message Compression

  • Compression 은 Production, Broker에서 일아날 수 있다.
  • Compression.type [none(default), gzip, lz4, snappy, zstd] 가 있다.(Kafka 2.1)
  • batch size가 클수록 compression 효율이 올라간다.
  • 압축으로 얻을 수 있는 이점
    • 메시지의 크기가 줄어듬(보통 1/4)
    • 전송속도가 빨라짐(=Low latency)
    • 높은 처리량
    • 효율적인 디스크 사용
  • 단점(미미함)
    • Producer : Compression을 위해 CPU cycles을 사용해야함
    • Consuemr : Decompression을 위해 CPU cycles을 사용해야함
  • 보통 sanppy, lz4를 먼저 사용해볼 것을 권장
  • linger.ms와 batch.size를 키워 압축률, 처리량을 높일 수 있음
  • 스트림의 처리량이 많을 때 반드시 Producer에서 압축을 권장
  • broker, topic level에서도 압축을 할 수 있음
    • topic : 해당 토픽에만 적용
    • broker
      • 모든 토픽에 적용
        - compression.type=producer : Producer에게 압축을 전적으로 맡김(권장)
      • compression.type=none : decompression을 수행
        - compression.type=lz4 : 만약 producer의 compression.type이 snappy라면 decompression 한 후 lz4로 압축함

linger.ms & batch.size

  • max.in.flight.requests.per.connection 은 flight 상태인 메시지의 상한이다.
  • max.in.flight.requests.per.connection 으로 인해 message를 보낼 수 없을때 Kafka는 다음에 전송될 batch를 미리 구성한다.
    • 처리량 ↑, latency ↓
  • batching mechanism에 영향을 주는 요소
    • batch.size
      • batch size의 상한
      • batch size 보다 큰 메시지는 바로 전송됨
      • 너무 큰 batch size를 설정하면 out of memory가 발생
      • Kafka Producer Metric을 모니터링해 평균 batch size를 모니터링 할 수 있다.
    • linger.ms(default 0)
      • batch에 더 많은 메시지를 넣기위해 설정한 latency
      • 약간의 latency의 대가로 batch 의 크기를 늘릴 수 있어 더 효율적인 압축이 가능

Kafka Consumer Replica Fetching

  • 원래 consumer는 각 partition의 leader로 부터 데이터를 받는다.
  • consumer와 leader를 저장하고 있는 broker사이의 물리적인 거리가 크다면 latency, cost측면에서 비효율적
  • 따라서 Kafka >= 2.4 부터 consumer가 replica set에서 가장 가까운 것을 읽을 수 있게됨
  • broker setting
    • replica.selector.class를 org.apahce.kafka.common.replica.RackAwareReplicaSelector로 설정
    • rack.id config를 data center ID(ex. AZ ID in AWS)로 설정
  • Consumer client setting
    • client.rack을 data center ID로 설정

Partitioner

key != null

  • key 가 null 이 아닐때 레코드는 partitioner 로직에 의해서 어떤 Partition에 할당될지 결정됨
  • 이 과정을 key hashing이라고함
  • default partitioner는 murmur2 hash 함수를 사용
    • targetPartition = Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)
  • key 값이 동일하면 같은 Partition에 할당되는 것을 보장, 하지만 topic에 partition을 추가하게 되면 이것을 보장하지 못함
  • 그래서 topic에 partition을 추가하는 것 보다 새로운 topic을 생성하는 것을 권장
  • default partitioner logic을 오버라이드 하는 것은 권장되지 않지만 특수한 경우에 오버라이드 해야한다면 partitioner.class를 사용해 변경할 수 있음

key == null

  • key 가 null일때 Kafka version에 따라 default partitioner가 다르게 동작한다.
  • RoundRobin : Kafka <= 2.3
    • 메시지 단위 roundrobin 방식으로 할당
  • Sticky Partitioner : Kafka > 2.3
    • 배치 단위 roundrobin 방식으로 할당
    • 처리량, latency가 줄어듬
    • batch.size, linger.ms를 활용하는 방식

max.block.ms & buffer.memory

  • producer가 message를 너무 빠른 속도로 생산하여 broker가 이를 따라가지 못하면 producer의 메모리에 records가 buffered된다.
  • buffer.memory(default 32MB)는 buffer의 크기를 지정
  • 만약 buffer가 모두 차게되면 producer는 .send()를 중단한다.
  • max.block.ms(default 60000)동안 buffer가 꽉찬 상태로 유지되면 throw exception
    • exception이 발생했다는 것은 보통 broker가 down or overload 되었다는 것을 의미

0개의 댓글