Kafka 4. topic config

skh951225·2023년 5월 4일
0

Kafka

목록 보기
4/4

topic config 설정하기

공식문서 : https://kafka.apache.org/documentation/#brokerconfigs

topic 에 min.insync.replicas 설정

kafka-topics --bootstrap-server localhost:9092 --topic configured-topic --create --replication-factor 1 --partitions 3

kafka-configs --bootstrap-server localhost:9092 --entity-type topic --entity-name configured-topic --alter --add-config min.insync.replicas=2

kafka-configs --bootstrap-server localhost:9092 --entity-type topic --entity-name configured-topic --describe

kafka-configs --bootstrap-server localhost:9092 --entity-type topic --entity-name configured-topic --alter --delete-config min.insync.replicas

Segment & index

  • topic은 partition으로 구성되어 있고 partition은 다시 segment로 구성된다.
  • Partition 내부의 segment중 쓰기작업중인 것을 ACTIVE 상태에 있다고 말한다.
  • Segment setting
    • log.segment.bytes : 하나의 segment의 최대 크기 (default 1GB)
    • log.segment.ms : segment의 최대 ACTIVE 기간 (default 1 week)
  • Segments come with two indexes
    • An offset to position index : 찾고자하는 message의 위치를 나타내는 index
    • A timestamp to offset index : 각 message의 timestamp를 나타내는 index
    • 각 segment에는 하나의 position index, timestamp index가 존재

Segment setting을 조정하면

  • log.segment.bytes 를 낮추면
    • partition에 더 많은 segment가 존재하게 되어
    • log compaction이 더 자주 일어나고
    • Kafka가 더 많은 파일을 열어놔야한다. → Error:Too many open files 발생 가능성이 높아짐
  • log.segment.ms 를 낮추면
    • log compaction의 최대 주기가 줄어든다.

log cleanup

  • log.cleanup.policy=delete
    • log의 max size를 정하고 max size를 넘어가면 오래된 log 부터 삭제 (default -1==infinite)
    • age of data 기반
  • log.cleanup.policy=compact
    • __consumer_offset 을 저장하는 topic에서 사용됨
    • key값 기준으로 최신 데이터만 저장
  • Why?
    • disk의 데이터를 확보하기 위해 구식 데이터를 삭제
    • Kafka cluster의 maintance work를 제한하기 위해
  • When?
    • Partition segment 단위로 log cleanup이 수행됨
    • 너무 자주 일어나게되면 CPU, RAM 자원을 쓸데없이 잡아먹게됨
    • log.cleaner.backoff.ms에 설정된 값으로 주기적으로 체크 (default 15s)

log.cleanup.policy=delete

  • log.retention.hours
    • data를 가지고 있는 기간 (default 1 week)
    • 이 값을 높게 설정하면 로그가 더 많은 디스크 공간을 차지함
    • log.retention.ms, log.retention.minutes도 존재, 더 작은 단위의 것을 우선시함
  • log.retention.bytes
    • 각 partition에 저장되는 Log의 최대 크기 (default -1==infinite)
  • usecases
    • log.retention.hours=168, log.retention.bytes=-1 : 1 week retetion
    • log.retention.hours=-1, log.retention.bytes=52428800 : 500MB retention

log.cleanup.policy=compact

  • log compaction은 partition 내의 key에 대한 최신 value만 저장
  • full history가 아닌 현재의 snapshot을 원할때 유용
  • log의 꼬리부분부터 읽는 consumer는 topic으로 전해지는 모든 Message를 정상적으로 소비 가능
  • log compaction은 message를 삭제하고 re-ordering은 하지 않음. 즉, message의 ordering은 유지됨
  • log compaction에 해당되는 record라고 바로 삭제하지 않고 delete.retention.ms(default 24h)동안은 보유하고 삭제함
  • log compaction에 대한 주의할 점
    • log compaction은 실시간으로 일어나는 것이 아니고 segmetn가 Commit되면 수행됨. segment가 commit되기 전에는 같은 Key를 가지는 메시지가 존재
    • log의 tail을 읽는 consumer는 log compaction의 효과를 볼 수 없음
    • log Compaction이 항상 잘 실행되지는 않음
      • 충분한 memory가 필요
      • log compaction이 잘 실행되지 않으면 Kafka를 restart
  • log compaction과 관련된 Parameters
    • segment.ms (default 1week)
      • segment가 active상태로 유지될 수 있는 상한
    • segment.bytes (default 1G)
      • segment의 size의 상한
    • min.compaction.lag.ms (default 0)
      • message가 압축되기 전에 기다리는 기간
    • delete.retention.ms (default 1day)
      • delete하기전에 보유하고 있는 기간
    • min.cleanable.dirty.ratio (default 0.5)
      • Compaction이 일어나기 위한 dirty ratio의 하한

uclean.leader.election.enable

  • default false
  • ISR(in-sync replica)이 아닌 OSR(out-of sync replica)를 가지고 있는 broker를 leader로 선출 할 수 있도록 설정하는 옵션
  • OSR을 리더로 선출하기 때문에 데이터 손실이 발생
  • Availability가 Retention보다 중요할때 사용
    • ex metric 수집, log collection

Large Messages

  • Kafka 에서는 기본적으로 message의 크기를 1MB로 제한하고 있다.
  • 크기가 큰 message를 다루는 2가지 패턴
    • 외부 storage(HDFS, S3, Google Cloud Storage ..)를 활용하는 방법
      • Producer는 message를 외부 Storage에 보내고 message의 Metadata를 Kafka에 보냄
      • Consumer는 message의 metadata를 확인하고 외부 Storage에서 해당 데이터를 찾음
    • Kafka parameter를 변경하는 방법
      • Broker side : message.max.bytes
      • Topic side : max.message.bytes
      • Broker-wise :replica.fetch.max.bytes
      • Consumer-side : max.partition.fetch.bytes
      • Producer-sied : max.request.size

0개의 댓글