어플리케이션은 KafkaConsumer
를 활용하여 이벤트를 읽습니다. 그렇다면 KafkaConsumer
API가 어떻게 작동하는지, 어떠한 설정이 있는지 알아봅시다.
카프카 컨슈머가 메시지를 소비할 때 처리할 수 있는 속도보다 빠르게 토픽에 메시지가 전달되면 처기가 계속해서 뒤로 밀리게 될 것입니다. 따라서 우리는 토픽으로부터 데이터를 읽어 오는 작업을 확장할 수 있어야 합니다.
이런 스케일 아웃
을 위해 컨슈머 그룹을 활용할 수 있습니다.
컨슈머 그룹내부의 컨슈머들은 토픽을 구독합니다. 컨슈머는 각각 하나의 파티션을 할당받아 메시지를 처리합니다. 컨슈머 중 몇몇은 유휴 상태가 되어 메시지를 전혀 받지 못합니다.
토픽
- 파티션 0
- 파티션 1
- 파티션 2
컨슈머 그룹
- 컨슈머 0 <-> 파티션 0
- 컨슈머 1 <-> 파티션 1
- 컨슈머 2 <-> 파티션 2
- 컨슈머 3
(컨슈머 3은 놀고 있습니다.)
메시지를 읽어들여 지연시간이 긴 작업(데이터베이스 쓰기 작업, 데이터에 대해 시간이 오래걸리는 작업 등)을 하는 것은 흔합니다. 따라서 컨슈머를 추가하는 것도 일반적인 규모 확장 방식입니다. 하지만 토픽에 설정된 파티션 수 이상으로 컨슈머를 투입하는 것은 아무 의미없는 짓입니다.
카프카는 컨슈머 그룹에 따라 메시지가 소비되며 카프카의 경우 성능 저하 없이 많은 수의 컨슈머와 컨슈머 그룹을 추가할 수 있습니다.
따라서 새로운 컨슈머 그룹을 추가하게 된다면 토피에 대한 모든 메시지를 받게 됩니다.
컨슈머 그룹에 속한 커슈머들은 구독하는 토픽의 파티션들에 대한 소유권을 공유합니다. 새로운 컨슈머를 컨슈머 그룹에 추가하면 다른 컨슈머가 일고 있던 파티션으로 부터 메시지를 읽기 시작합니다. (크래시되거나 종료됬을 때도 동일)
이러한 작업을 리밸런싱이라고 합니다.
리밸런싱의 종류는 2가지가 있지만 카프카 3.1이상부터는 협력적 리밸런스가 기본값이 되었습니다.
협력적 리밸런스는 한 컨슈머에게 할당되어 있던 파티션만을 다른 컨슈머에게 재할당하는 것을 의미합니다. 재할당되지 않은 파티션에서 레코드를 읽어서 처리하던 컨슈머들은 작업에 방해받지 않고 하던 일을 계속할 수 있는 것입니다.
하나의 컨슈머에 2가지 파티션이 연결되어 있을 때 에러가 발생 협력적 리밸런스는 해당 상황에서 stop the world를 하지 않고 파티션을 리밸런싱 합니다.
컨슈머 그룹의 그룹 코디네이터역할을 지정받은 카프카 브로커에 하트비트를 전송함으로써 생존확인을 진행합니다.
기본적으로 컨슈머가 갖는 컨슈머 그룹의 멤버로써 자격은 일시적입니다. 컨슈머가 컨슈머 그룹을 떠나는 순간 컨슈머에 할당되어 있던 파티션은 해제되고, 다시 참여하면 새로운 멤버 ID가 발급되면서 리밸런스 프로토콜에 의해 새로운 파티션들이 할당됩니다.
하지만 정적 그룹 멤버십은 각 컨슈머의 상태를 캐시해 두었다가, 컨슈머가 종료되었다가 다시 켜졌을 때 동일한 파티션을 연결할 수 있게 해줍니다.
(컨슈머 그룹의 정적 멤버는 종료할 때 미리 컨슈머 그룹을 떠나지 않습니다. session.timeout.ms
설정에 따라 해당 시간만큼 컨슈머가 응답이 없다면, 리밸런싱을 실행하게 됩니다.)
다음은 컨슈머 정의 예제입니다.
val props = Properties()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] =
"---" // 브로커 주소 정의
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] =
"org.apache.kafka.common.serialization.StringDeserializer"
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringDeserializer"
props[ConsumerConfig.GROUP_ID_CONFIG] =
"demo-consumer-group-2" // Consumer는 group단위로 이루어 지며, 그룹이 다르면 서로 같은 데이터들을 서로 다른 어플리케이션에서 소비 가능
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" // earliest, latest, none
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
val consumer = KafkaConsumer<String, String>(props)
프로듀서와 동일하되 ConsumerConfig.GROUP_ID_CONFIG
정도만 추가되어 있습니다.
consumer.subscribe(listOf("demo"))
컨슈머를 생성하고 나서 다음으로 할 일은 1개 이상의 토픽을 구독하는 것입니다. 이는 매개변수로 토픽 리스트
를 받습니다. (리스트 뿐만 아니라 정규식도 가능합니다.) 사용법이 상당히 간단합니다.
컨슈머 API
의 핵심은 서버에 추가 데이터가 들어왔는지 폴링하는 단순한 루프입니다.
val consumer = KafkaConsumer<String, String>(props)
consumer.subscribe(listOf("demo"))
while (true) {
// 최대 5초까지는 데이터가 없어도 대기하겠다. -> 5초 이내의 데이터가 들어오면 이를 소비함
val records: ConsumerRecords<String, String> = consumer.poll(Duration.ofMillis(5000L))
println("polling...")
records.forEach {
println("Key ${it.key()}, Value : ${it.value()}")
}
if (!records.isEmpty) {
consumer.commitAsync()
}
}
위의 루프틑 무한 루프이기 때문에 종료되지 않습니다. poll()
메소드를 활용하여 최대 5초동안 데이터가 들어오기를 대기합니다.
하나의 쓰레드에서 동일한 그룹 내에 여러 개의 컨슈머를 생성할 수는 없으며, 같은 컨슈머를 다수의 스레드가 안전하게 사용할 수도 없습니다.
하나의 스레드 당 하나의 컨슈머
이것이 원칙입니다. 따라서 한 어플리케이션에서 동일한 그룹에 속하는 여러 컨슈머를 운용하고 싶다면 각각의 쓰레드를 분리하여 컨슈머를 돌려야 합니다.
브로커로부터 레코드를 읽어올 때 데이터의 최소량을 지정합니다. 브로커는 이 설정값을 보고 메시지 량이 충분하지 않으면 대기합니다.
컨슈머에게 응답하기 전 충분한 데이터가 모일 때까지 기다릴 수 있습니다. 해당 설정은 얼마나 오래 기다릴 것인지를 의미합니다. 전송할 최소 사이즈가 가득 차거나, 최대 시간이 지나면 브로커는 메시지를 전송합니다.
브로커를 폴링할 때 반환하는 최대 바이트 수를 지정합니다.
폴링할 때 반환되는 최대 레코드 수를 의미합니다.
서버가 파티션 별로 리턴하는 최대 바이트 수를 의미합니다. 이는 브로커에서 응답해온 응답의 파티션을 결정하는 것이 아닙니다. 따라서 이설정으로 메모리 사용량을 조절하는 것은 어렵습니다.
컨슈머와 브로커가 신호를 주고받지 않고도 살아 있는 것으로 판정되는 최대 시간은 기본 10초입니다.
컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않은 채로 session.timeout.ms
만큼 지나게 되면 컨슈머는 죽은 것으로 간주하고, 파티션 리밸런싱을 진행합니다.
heartbeat.interval.ms
는 하트비트를 몇초마다 보낼지를 의미합니다. (보통 1:3비율로 설정함)
컨슈머가 폴링을 하지 않고도 죽은 것으로 판정되지 않을 수 있는 최대 시간을 지정합니다. 이는 session.timeout.ms
와 비슷하다고 느낄 수 있지만 약간 다릅니다.
메인 쓰레드에서 poll()
메소드를 무한정 처리하고 백그라운드 쓰레드에서 하트비트를 계속 전송한다면 이 카프카는 죽지 않는 좀비상태가 됩니다. 따라서 poll
처리에 대한 최대 시간을 설정하는 것이 해당 설정입니다. (기본 값은 5분입니다.)
모든 컨슈머 API
호출에 적용되는 타임아웃 값입니다.
컨슈머가 브로커의 응답을 기다릴 수 있는 최대 시간을 의미합니다. 기본값은 30초
메시지를 어디서 부터 읽어올지를 결정합니다. 값으로는 letest
, earlist
가 있습니다.
컨슈머가 자동으로 오프셋을 커밋할지를 결정합니다. (기본값 true
) true
로 설정한다면 auto.commit.interval.ms
를 활용해 얼마나 자주 오프셋이 커밋될지를 제어할 수 있습니다.
어느 컨슈머에게 어떤 파티션이 할당될지를 설정합니다.
컨슈머가 구독하는 각 토픽의 파티션들을 연속된 그룹으로 나누어 할당합니다.
모든 컨슈머들이 동일한 수의 파티션을 할당받게 합니다.
파티션을 가능한 균등하게 할당하되, 리밸런스가 발생했을 때 많은 파티션이 같은 컨슈머에 할되게 함으로써 하나의 컨슈머에서 다른 컨슈머로 파티션을 옮기는 오버헤드를 줄인 방식입니다.
Sticky
와 동일하게 작동하지만, 협력적 리밸런스 기능을 지원합니다.
클라이언트를 식별할 수 있는 아이디를 의미합니다.(로깅, 모니터링 용)
클러스터가 다수의 데이터센터 혹은 클라우드에 걸쳐 설치되어있는 경우 어떤 레플리카에서 불러올지를 설정합니다.
정적 그룹 멤버십 기능을 제공합니다. -> 정적 그룹 멤버들끼리는 컨슈머가 꺼졌다 켜져도 같은 파티션을 할당받기에 리밸런싱이 일어나지 않음
데이터를 읽거나 쓸 때 소켓이 사용하는 TCP
의 수신 및 수신 버퍼의 크기를 가르킵니다. (-1은 기본 OS값)
이는 브로커 설정입니다. 컨슈머 그룹에 현재 돌아가는 컨슈머가 없을 때 오프셋을 얼마나 저장할지를 저장합니다.
poll()
을 호출할 때마다 카프카에 쓰여진 메시지 중 컨슈머 그룹에 속한 컨슈머가 읽지 않은 메시지를 리턴합니다.
하지만 컨슈머측에서 commit
을 하여 오프셋을 저장하지 않는다면 같은 메시지를 전송하게 되어 중복이 발생하게 됩니다.
카프카에선이 문제를 해결하기위해 다양한 방법을 지원합니다.
컨슈머가 오프셋을 자동으로 커밋하게 합니다. 하지만 이도 완벽하지 않은게 5초마다 커밋을 한다고하면 3초가 지난뒤 크래쉬가 발생했을 때 어쩔 수 ㅓㅇㅂㅅ이 중복이 생기게 됩니다.
commitAsync()
를 활용하여 커밋을 수동으로 진행할 수 있습니다.
수동 커밋은 브로커가 커밋 요청에 응답할 때 까지 어플리케이션이 블록됩니다. 이는 처리량을 제한하게 됩니다.
commitSync()
를 활용해 커밋이 성공하거나 회복 불가능한 에러가 발생할때까지 재시도할 수 있습니다.
CommitAsync()
는 특정 오프셋을 직접 넣어서 커밋할 수 있습니다.
컨슈머가 종료되기 전이나 리밸런싱이 시작되기 전에 정리작업을 해주어야 할 수도 있습니다. 즉 컨슈머에 할당된 파티션이 해제될 것이라는 걸 알게 된다면 해당 파티션에서 마지막으로 처리한 이벤트의 오프셋을 커밋해주어야 합니다.
컨슈머 API
는 파티션이 할당되거나 해제될 때 사용자의 코드가 실행되도록 하는 메커니즘을 제공합니다.
내부 함수를 오버라이드하여 리밸런스의 인터셉터 역할을 수행할 수 있습니다.
seekToBeginning
, seekToEnd
을 활용하여 메시지를 특정 지점 부터 읽어올 수 있습니다.
위의 예제에서는 무한 루프에서 폴링을 실행하였는데 더 좋은 방법이 있습니다.
컨슈머를 종료하고자 할 때, 컨슈머가 poll()
을 오랫동안 기다리고 있더라고 즉시 루프를 탈출하고 싶다면 다른 스레드에서 consumer.wakeup()
를 활용하면 됩니다. 이는 poll()
이 WakeupException
을 발생시키며 중단되거나, 대기중이 아닐 경우에는 다음 번에 청므으로 poll()
을 호출할 때 예외를 발생시킵니다.
wakeup()
메소드는 다른 쓰레드에서 호출할 때만 안전하게 작동하는 컨슈머 메소드입니다.
카프카에 데이터를 쓰기 전 커스텀 객체를 바이트로 변환시키기 위해 시리얼라이저가 필요합니다. 또한 바이트배열을 자바 객체로 변환하기 위해서도 디시리얼라이저가 필요합니다.
이러한 디시리얼라이저는 커스텀해서 직접만들 수도 있지만, 범용 시리얼라이저(avro)를 활용하는 것을 권장합니다.
List<T>
를 시리얼라이즈 하는 것이 가능한가요?가능합니다. 하지만 중첩된 타입에 대한 기능인 만큼 사용방법이 복잡합니다. 설정할 때 내부적으로 리스트와 리스트 안의 객체에 대한 설정 값을 모두 props
로 저장해야 합니다. 이는 필요할 때 구글링하여 사용하기
컨슈머 그룹은 컨슈머들에게 파티션을 할당하고, 리밸런싱해주고, 스케일 아웃까지 제공한다. 하지만 하나의 컨슈머가 토픽의 모든 파티션으로 부터 데이터를 읽어와야 하거나 토픽의 특정 파티션으로부터 데이터를 읽어야할 경우가 있습니다.
이런 특별한 경우는 토픽을 구독할 필요 없이 파티션을 스스로 할당받을 수 있습니다.
val partitionInfos = consumer.partitionsFor("topic")
partitionInfos?.let { partition ->
partitions.add(partition.topic, partition.partition)
}
consumer.assign(partitions)
while (true) {
cosumer.poll(timeout)
// ...
}
카프카의 컨슈머는 Buffer
를 가지고 있습니다. poll()
요청이 들어온다면 컨슈머는 fetch
를 통해 컨슈머의 버퍼에 데이터를 채웁니다. 만약 버퍼에 데이터가 있다면, 해당 데이터를 poll.max.bytes
만큼 불러서 처리합니다.
즉, 컨슈머의 poll()
은 2가지 분기처리를 통해 메시지를 처리합니다.