[카프카 핵심 가이드] Chapter 04. 카프카 컨슈머 : 카프카에서 데이터 읽기

Falco·2023년 11월 25일
0
post-thumbnail

카프카에서 데이터 읽기

어플리케이션은 KafkaConsumer를 활용하여 이벤트를 읽습니다. 그렇다면 KafkaConsumer API가 어떻게 작동하는지, 어떠한 설정이 있는지 알아봅시다.

4.1 카프카 컨슈머 : 개념

4.1.1 컨슈머와 컨슈머 그룹

카프카 컨슈머가 메시지를 소비할 때 처리할 수 있는 속도보다 빠르게 토픽에 메시지가 전달되면 처기가 계속해서 뒤로 밀리게 될 것입니다. 따라서 우리는 토픽으로부터 데이터를 읽어 오는 작업을 확장할 수 있어야 합니다.

이런 스케일 아웃을 위해 컨슈머 그룹을 활용할 수 있습니다.

컨슈머 그룹내부의 컨슈머들은 토픽을 구독합니다. 컨슈머는 각각 하나의 파티션을 할당받아 메시지를 처리합니다. 컨슈머 중 몇몇은 유휴 상태가 되어 메시지를 전혀 받지 못합니다.

  • 토픽
    - 파티션 0
    - 파티션 1
    - 파티션 2

  • 컨슈머 그룹
    - 컨슈머 0 <-> 파티션 0
    - 컨슈머 1 <-> 파티션 1
    - 컨슈머 2 <-> 파티션 2
    - 컨슈머 3

(컨슈머 3은 놀고 있습니다.)

메시지를 읽어들여 지연시간이 긴 작업(데이터베이스 쓰기 작업, 데이터에 대해 시간이 오래걸리는 작업 등)을 하는 것은 흔합니다. 따라서 컨슈머를 추가하는 것도 일반적인 규모 확장 방식입니다. 하지만 토픽에 설정된 파티션 수 이상으로 컨슈머를 투입하는 것은 아무 의미없는 짓입니다.

카프카는 컨슈머 그룹에 따라 메시지가 소비되며 카프카의 경우 성능 저하 없이 많은 수의 컨슈머와 컨슈머 그룹을 추가할 수 있습니다.

따라서 새로운 컨슈머 그룹을 추가하게 된다면 토피에 대한 모든 메시지를 받게 됩니다.

4.1.2 컨슈머 그룹과 파티션 리밸런스

컨슈머 그룹에 속한 커슈머들은 구독하는 토픽의 파티션들에 대한 소유권을 공유합니다. 새로운 컨슈머를 컨슈머 그룹에 추가하면 다른 컨슈머가 일고 있던 파티션으로 부터 메시지를 읽기 시작합니다. (크래시되거나 종료됬을 때도 동일)

이러한 작업을 리밸런싱이라고 합니다.

리밸런싱의 종류는 2가지가 있지만 카프카 3.1이상부터는 협력적 리밸런스가 기본값이 되었습니다.

협력적 리밸런스

협력적 리밸런스는 한 컨슈머에게 할당되어 있던 파티션만을 다른 컨슈머에게 재할당하는 것을 의미합니다. 재할당되지 않은 파티션에서 레코드를 읽어서 처리하던 컨슈머들은 작업에 방해받지 않고 하던 일을 계속할 수 있는 것입니다.

하나의 컨슈머에 2가지 파티션이 연결되어 있을 때 에러가 발생 협력적 리밸런스는 해당 상황에서 stop the world를 하지 않고 파티션을 리밸런싱 합니다.

컨슈머 그룹의 그룹 코디네이터역할을 지정받은 카프카 브로커에 하트비트를 전송함으로써 생존확인을 진행합니다.

4.1.3 정적 그룹 멤버십

기본적으로 컨슈머가 갖는 컨슈머 그룹의 멤버로써 자격은 일시적입니다. 컨슈머가 컨슈머 그룹을 떠나는 순간 컨슈머에 할당되어 있던 파티션은 해제되고, 다시 참여하면 새로운 멤버 ID가 발급되면서 리밸런스 프로토콜에 의해 새로운 파티션들이 할당됩니다.

하지만 정적 그룹 멤버십은 각 컨슈머의 상태를 캐시해 두었다가, 컨슈머가 종료되었다가 다시 켜졌을 때 동일한 파티션을 연결할 수 있게 해줍니다.

(컨슈머 그룹의 정적 멤버는 종료할 때 미리 컨슈머 그룹을 떠나지 않습니다. session.timeout.ms 설정에 따라 해당 시간만큼 컨슈머가 응답이 없다면, 리밸런싱을 실행하게 됩니다.)

4.2 카프카 컨슈머 생성하기

다음은 컨슈머 정의 예제입니다.

val props = Properties()

props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] =
    "---" // 브로커 주소 정의
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] =
    "org.apache.kafka.common.serialization.StringDeserializer"
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringDeserializer"
props[ConsumerConfig.GROUP_ID_CONFIG] =
    "demo-consumer-group-2" // Consumer는 group단위로 이루어 지며, 그룹이 다르면 서로 같은 데이터들을 서로 다른 어플리케이션에서 소비 가능
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" // earliest, latest, none
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"

val consumer = KafkaConsumer<String, String>(props)

프로듀서와 동일하되 ConsumerConfig.GROUP_ID_CONFIG 정도만 추가되어 있습니다.

토픽 구독하기

consumer.subscribe(listOf("demo"))

컨슈머를 생성하고 나서 다음으로 할 일은 1개 이상의 토픽을 구독하는 것입니다. 이는 매개변수로 토픽 리스트를 받습니다. (리스트 뿐만 아니라 정규식도 가능합니다.) 사용법이 상당히 간단합니다.

4.4 폴링 루프

컨슈머 API의 핵심은 서버에 추가 데이터가 들어왔는지 폴링하는 단순한 루프입니다.

val consumer = KafkaConsumer<String, String>(props)
consumer.subscribe(listOf("demo"))

while (true) {
    // 최대 5초까지는 데이터가 없어도 대기하겠다. -> 5초 이내의 데이터가 들어오면 이를 소비함
    val records: ConsumerRecords<String, String> = consumer.poll(Duration.ofMillis(5000L))

    println("polling...")

    records.forEach {
        println("Key ${it.key()}, Value : ${it.value()}")
    }

    if (!records.isEmpty) {
        consumer.commitAsync()
    }
}

위의 루프틑 무한 루프이기 때문에 종료되지 않습니다. poll()메소드를 활용하여 최대 5초동안 데이터가 들어오기를 대기합니다.

4.4.1 쓰레드 안정성

하나의 쓰레드에서 동일한 그룹 내에 여러 개의 컨슈머를 생성할 수는 없으며, 같은 컨슈머를 다수의 스레드가 안전하게 사용할 수도 없습니다.

하나의 스레드 당 하나의 컨슈머

이것이 원칙입니다. 따라서 한 어플리케이션에서 동일한 그룹에 속하는 여러 컨슈머를 운용하고 싶다면 각각의 쓰레드를 분리하여 컨슈머를 돌려야 합니다.

4.5 컨슈머 설정하기

fetch.min.bytes

브로커로부터 레코드를 읽어올 때 데이터의 최소량을 지정합니다. 브로커는 이 설정값을 보고 메시지 량이 충분하지 않으면 대기합니다.

fetch.max.wait.ms

컨슈머에게 응답하기 전 충분한 데이터가 모일 때까지 기다릴 수 있습니다. 해당 설정은 얼마나 오래 기다릴 것인지를 의미합니다. 전송할 최소 사이즈가 가득 차거나, 최대 시간이 지나면 브로커는 메시지를 전송합니다.

fetch.max.bytes

브로커를 폴링할 때 반환하는 최대 바이트 수를 지정합니다.

max.poll.records

폴링할 때 반환되는 최대 레코드 수를 의미합니다.

max.partition.fetch.bytes

서버가 파티션 별로 리턴하는 최대 바이트 수를 의미합니다. 이는 브로커에서 응답해온 응답의 파티션을 결정하는 것이 아닙니다. 따라서 이설정으로 메모리 사용량을 조절하는 것은 어렵습니다.

session.timeout.ms 그리고 heartbeat.interval.ms

컨슈머와 브로커가 신호를 주고받지 않고도 살아 있는 것으로 판정되는 최대 시간은 기본 10초입니다.

컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않은 채로 session.timeout.ms만큼 지나게 되면 컨슈머는 죽은 것으로 간주하고, 파티션 리밸런싱을 진행합니다.

heartbeat.interval.ms는 하트비트를 몇초마다 보낼지를 의미합니다. (보통 1:3비율로 설정함)

max.poll.interval.ms

컨슈머가 폴링을 하지 않고도 죽은 것으로 판정되지 않을 수 있는 최대 시간을 지정합니다. 이는 session.timeout.ms와 비슷하다고 느낄 수 있지만 약간 다릅니다.

메인 쓰레드에서 poll()메소드를 무한정 처리하고 백그라운드 쓰레드에서 하트비트를 계속 전송한다면 이 카프카는 죽지 않는 좀비상태가 됩니다. 따라서 poll처리에 대한 최대 시간을 설정하는 것이 해당 설정입니다. (기본 값은 5분입니다.)

default.api.timeout.ms

모든 컨슈머 API호출에 적용되는 타임아웃 값입니다.

request.timeout.ms

컨슈머가 브로커의 응답을 기다릴 수 있는 최대 시간을 의미합니다. 기본값은 30초

auto.offset.reset

메시지를 어디서 부터 읽어올지를 결정합니다. 값으로는 letest, earlist가 있습니다.

enable.auto.commit

컨슈머가 자동으로 오프셋을 커밋할지를 결정합니다. (기본값 true) true로 설정한다면 auto.commit.interval.ms를 활용해 얼마나 자주 오프셋이 커밋될지를 제어할 수 있습니다.

partition.assignment.strategy

어느 컨슈머에게 어떤 파티션이 할당될지를 설정합니다.

  • Range

컨슈머가 구독하는 각 토픽의 파티션들을 연속된 그룹으로 나누어 할당합니다.

  • RoundRobin

모든 컨슈머들이 동일한 수의 파티션을 할당받게 합니다.

  • Sticky

파티션을 가능한 균등하게 할당하되, 리밸런스가 발생했을 때 많은 파티션이 같은 컨슈머에 할되게 함으로써 하나의 컨슈머에서 다른 컨슈머로 파티션을 옮기는 오버헤드를 줄인 방식입니다.

  • Cooperative Sticky

Sticky와 동일하게 작동하지만, 협력적 리밸런스 기능을 지원합니다.

client.id

클라이언트를 식별할 수 있는 아이디를 의미합니다.(로깅, 모니터링 용)

client.rack

클러스터가 다수의 데이터센터 혹은 클라우드에 걸쳐 설치되어있는 경우 어떤 레플리카에서 불러올지를 설정합니다.

group.instance.id

정적 그룹 멤버십 기능을 제공합니다. -> 정적 그룹 멤버들끼리는 컨슈머가 꺼졌다 켜져도 같은 파티션을 할당받기에 리밸런싱이 일어나지 않음

receive.buffer.bytes, send.buffer.bytes

데이터를 읽거나 쓸 때 소켓이 사용하는 TCP의 수신 및 수신 버퍼의 크기를 가르킵니다. (-1은 기본 OS값)

offsets.retention.minutes

이는 브로커 설정입니다. 컨슈머 그룹에 현재 돌아가는 컨슈머가 없을 때 오프셋을 얼마나 저장할지를 저장합니다.

4.6 오프셋과 커밋

poll()을 호출할 때마다 카프카에 쓰여진 메시지 중 컨슈머 그룹에 속한 컨슈머가 읽지 않은 메시지를 리턴합니다.

하지만 컨슈머측에서 commit을 하여 오프셋을 저장하지 않는다면 같은 메시지를 전송하게 되어 중복이 발생하게 됩니다.

카프카에선이 문제를 해결하기위해 다양한 방법을 지원합니다.

4.6.1 자동 커밋

컨슈머가 오프셋을 자동으로 커밋하게 합니다. 하지만 이도 완벽하지 않은게 5초마다 커밋을 한다고하면 3초가 지난뒤 크래쉬가 발생했을 때 어쩔 수 ㅓㅇㅂㅅ이 중복이 생기게 됩니다.

4.6.2 현재 오프셋 커밋하기

commitAsync()를 활용하여 커밋을 수동으로 진행할 수 있습니다.

수동 커밋은 브로커가 커밋 요청에 응답할 때 까지 어플리케이션이 블록됩니다. 이는 처리량을 제한하게 됩니다.

commitSync()를 활용해 커밋이 성공하거나 회복 불가능한 에러가 발생할때까지 재시도할 수 있습니다.

CommitAsync()는 특정 오프셋을 직접 넣어서 커밋할 수 있습니다.

4.7 리밸런스 리스너

컨슈머가 종료되기 전이나 리밸런싱이 시작되기 전에 정리작업을 해주어야 할 수도 있습니다. 즉 컨슈머에 할당된 파티션이 해제될 것이라는 걸 알게 된다면 해당 파티션에서 마지막으로 처리한 이벤트의 오프셋을 커밋해주어야 합니다.

컨슈머 API는 파티션이 할당되거나 해제될 때 사용자의 코드가 실행되도록 하는 메커니즘을 제공합니다.

내부 함수를 오버라이드하여 리밸런스의 인터셉터 역할을 수행할 수 있습니다.

4.8 특정 오프셋의 레코드 읽어오기

seekToBeginning, seekToEnd을 활용하여 메시지를 특정 지점 부터 읽어올 수 있습니다.

4.9 폴링 루프 벗어나기

위의 예제에서는 무한 루프에서 폴링을 실행하였는데 더 좋은 방법이 있습니다.

컨슈머를 종료하고자 할 때, 컨슈머가 poll()을 오랫동안 기다리고 있더라고 즉시 루프를 탈출하고 싶다면 다른 스레드에서 consumer.wakeup()를 활용하면 됩니다. 이는 poll()WakeupException을 발생시키며 중단되거나, 대기중이 아닐 경우에는 다음 번에 청므으로 poll()을 호출할 때 예외를 발생시킵니다.

wakeup()메소드는 다른 쓰레드에서 호출할 때만 안전하게 작동하는 컨슈머 메소드입니다.

4.10 디시리얼라이저

카프카에 데이터를 쓰기 전 커스텀 객체를 바이트로 변환시키기 위해 시리얼라이저가 필요합니다. 또한 바이트배열을 자바 객체로 변환하기 위해서도 디시리얼라이저가 필요합니다.

이러한 디시리얼라이저는 커스텀해서 직접만들 수도 있지만, 범용 시리얼라이저(avro)를 활용하는 것을 권장합니다.

List<T>를 시리얼라이즈 하는 것이 가능한가요?

가능합니다. 하지만 중첩된 타입에 대한 기능인 만큼 사용방법이 복잡합니다. 설정할 때 내부적으로 리스트와 리스트 안의 객체에 대한 설정 값을 모두 props로 저장해야 합니다. 이는 필요할 때 구글링하여 사용하기

4.11 독립 실행 컨슈머 - 컨슈머 그룹 없이 컨슈머를 사용해야 하는 이유와 방법

컨슈머 그룹은 컨슈머들에게 파티션을 할당하고, 리밸런싱해주고, 스케일 아웃까지 제공한다. 하지만 하나의 컨슈머가 토픽의 모든 파티션으로 부터 데이터를 읽어와야 하거나 토픽의 특정 파티션으로부터 데이터를 읽어야할 경우가 있습니다.

이런 특별한 경우는 토픽을 구독할 필요 없이 파티션을 스스로 할당받을 수 있습니다.

val partitionInfos = consumer.partitionsFor("topic")
partitionInfos?.let { partition ->
	partitions.add(partition.topic, partition.partition)
}

consumer.assign(partitions)

while (true) {
	cosumer.poll(timeout)
    // ...
}

4.12 요약

컨슈머 그룹이 무엇인지?

  1. 메시지를 순차적으로 하나씩만 처리할 수 있게 해줌
  2. 리밸런싱을 통해 파티션을 나누기
  3. 그룹 사용하지 않고 파티션 직접 할당 가능

컨슈머 설정에 대하여

  1. 타임아웃에 관련된 여러가지 설정에 대하여

오프셋과 커밋에 대하여

(디)시리얼라이저와 사용법

컨슈머의 버퍼에 대하여

카프카의 컨슈머는 Buffer를 가지고 있습니다. poll()요청이 들어온다면 컨슈머는 fetch를 통해 컨슈머의 버퍼에 데이터를 채웁니다. 만약 버퍼에 데이터가 있다면, 해당 데이터를 poll.max.bytes만큼 불러서 처리합니다.

즉, 컨슈머의 poll()은 2가지 분기처리를 통해 메시지를 처리합니다.

https://velog.io/@hyeondev/%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%BB%A8%EC%8A%88%EB%A8%B8%EC%97%90%EC%84%9C-poll-%EC%9D%84-%EC%9A%94%EC%B2%AD%ED%95%98%EB%A9%B4-%EC%96%B4%EB%96%A4-%EC%9D%BC%EC%9D%B4-%EC%9D%BC%EC%96%B4%EB%82%A0%EA%B9%8C

profile
강단있는 개발자가 되기위하여

0개의 댓글