topic config 설정하기
공식문서 : https://kafka.apache.org/documentation/#brokerconfigs
topic 에 min.insync.replicas 설정
kafka-topics --bootstrap-server localhost:9092 --topic configured-topic --create --replication-factor 1 --partitions 3
kafka-configs --bootstrap-server localhost:9092 --entity-type topic --entity-name configured-topic --alter --add-config min.insync.replicas=2
kafka-configs --bootstrap-server localhost:9092 --entity-type topic --entity-name configured-topic --describe
kafka-configs --bootstrap-server localhost:9092 --entity-type topic --entity-name configured-topic --alter --delete-config min.insync.replicas
Segment & index
- topic은 partition으로 구성되어 있고 partition은 다시 segment로 구성된다.
- Partition 내부의 segment중 쓰기작업중인 것을 ACTIVE 상태에 있다고 말한다.
- Segment setting
- log.segment.bytes : 하나의 segment의 최대 크기 (default 1GB)
- log.segment.ms : segment의 최대 ACTIVE 기간 (default 1 week)
- Segments come with two indexes
- An offset to position index : 찾고자하는 message의 위치를 나타내는 index
- A timestamp to offset index : 각 message의 timestamp를 나타내는 index
- 각 segment에는 하나의 position index, timestamp index가 존재
Segment setting을 조정하면
- log.segment.bytes 를 낮추면
- partition에 더 많은 segment가 존재하게 되어
- log compaction이 더 자주 일어나고
- Kafka가 더 많은 파일을 열어놔야한다. → Error:Too many open files 발생 가능성이 높아짐
- log.segment.ms 를 낮추면
- log compaction의 최대 주기가 줄어든다.
log cleanup
- log.cleanup.policy=delete
- log의 max size를 정하고 max size를 넘어가면 오래된 log 부터 삭제 (default -1==infinite)
- age of data 기반
- log.cleanup.policy=compact
- __consumer_offset 을 저장하는 topic에서 사용됨
- key값 기준으로 최신 데이터만 저장
- Why?
- disk의 데이터를 확보하기 위해 구식 데이터를 삭제
- Kafka cluster의 maintance work를 제한하기 위해
- When?
- Partition segment 단위로 log cleanup이 수행됨
- 너무 자주 일어나게되면 CPU, RAM 자원을 쓸데없이 잡아먹게됨
- log.cleaner.backoff.ms에 설정된 값으로 주기적으로 체크 (default 15s)
log.cleanup.policy=delete
- log.retention.hours
- data를 가지고 있는 기간 (default 1 week)
- 이 값을 높게 설정하면 로그가 더 많은 디스크 공간을 차지함
- log.retention.ms, log.retention.minutes도 존재, 더 작은 단위의 것을 우선시함
- log.retention.bytes
- 각 partition에 저장되는 Log의 최대 크기 (default -1==infinite)
- usecases
- log.retention.hours=168, log.retention.bytes=-1 : 1 week retetion
- log.retention.hours=-1, log.retention.bytes=52428800 : 500MB retention
log.cleanup.policy=compact
- log compaction은 partition 내의 key에 대한 최신 value만 저장
- full history가 아닌 현재의 snapshot을 원할때 유용
- log의 꼬리부분부터 읽는 consumer는 topic으로 전해지는 모든 Message를 정상적으로 소비 가능
- log compaction은 message를 삭제하고 re-ordering은 하지 않음. 즉, message의 ordering은 유지됨
- log compaction에 해당되는 record라고 바로 삭제하지 않고 delete.retention.ms(default 24h)동안은 보유하고 삭제함
- log compaction에 대한 주의할 점
- log compaction은 실시간으로 일어나는 것이 아니고 segmetn가 Commit되면 수행됨. segment가 commit되기 전에는 같은 Key를 가지는 메시지가 존재
- log의 tail을 읽는 consumer는 log compaction의 효과를 볼 수 없음
- log Compaction이 항상 잘 실행되지는 않음
- 충분한 memory가 필요
- log compaction이 잘 실행되지 않으면 Kafka를 restart
- log compaction과 관련된 Parameters
- segment.ms (default 1week)
- segment가 active상태로 유지될 수 있는 상한
- segment.bytes (default 1G)
- min.compaction.lag.ms (default 0)
- delete.retention.ms (default 1day)
- min.cleanable.dirty.ratio (default 0.5)
- Compaction이 일어나기 위한 dirty ratio의 하한
uclean.leader.election.enable
- default false
- ISR(in-sync replica)이 아닌 OSR(out-of sync replica)를 가지고 있는 broker를 leader로 선출 할 수 있도록 설정하는 옵션
- OSR을 리더로 선출하기 때문에 데이터 손실이 발생
- Availability가 Retention보다 중요할때 사용
- ex metric 수집, log collection
Large Messages
- Kafka 에서는 기본적으로 message의 크기를 1MB로 제한하고 있다.
- 크기가 큰 message를 다루는 2가지 패턴
- 외부 storage(HDFS, S3, Google Cloud Storage ..)를 활용하는 방법
- Producer는 message를 외부 Storage에 보내고 message의 Metadata를 Kafka에 보냄
- Consumer는 message의 metadata를 확인하고 외부 Storage에서 해당 데이터를 찾음
- Kafka parameter를 변경하는 방법
- Broker side : message.max.bytes
- Topic side : max.message.bytes
- Broker-wise :replica.fetch.max.bytes
- Consumer-side : max.partition.fetch.bytes
- Producer-sied : max.request.size