컨슈머는 커밋 과정을 반복한다. 카프카 브로커로부터 데이터를 어디까지 가져갔는지 커밋을 통해 기록하는 것이다.
오프셋 커밋은 컨슈머 애플리케이션에서 명시적, 비명시적으로 수행할 수 있다.
기본 옵션은 poll()
메서드가 수행될 때 일정 간격마다 오프셋을 커밋하도록 enable.auto.commit=true
로 설정되어 있다. 이를 비명시 오프셋 커밋이라 한다.
장단점은 다음과 같다.
poll()
메서드를 호출할 때 커밋을 수행하므로 코드상에서 따로 커밋 관련 코드를 작성할 필요가 없다.poll()
메서드 호출 뒤 리밸런싱 또는 컨슈머 강제종료 발생 시 컨슈머가 처리하는 데이터가 중복 또는 유실될 수 있는 가능성이 있다.명시적으로 오프셋을 커밋하려면 poll()
메서드 호출 이후 반환받은 데이터의 처리가 완료되고 commitSync()
메서드를 호출하면 된다. 이 메서드는 poll()
메서드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행한다.
commitSync()
메서드는 브로커에 커밋 요청을 하고 커밋이 정상적으로 처리되었는지 응답하기까지 기다리는데 이는 컨슈머의 처리량에 영향을 끼친다. 데이터 처리 시간에 비해 커밋 요청 및 응답에 시간이 오래 걸린다면 동일 시간당 데이터 처리량이 줄어들기 때문이다.
이를 해결하기 위해 commitAsync()
메서드를 통해 커밋 요청을 전성하고 응답이 오기 전까지 데이터 처리를 수행하곤 한다. 하지만, 이러한 비동기 커밋은 커밋 요청이 실패했을 경우 현재 처리 중인 데이터의 순서를 보장하지 않으며 데이터의 중복 처리가 발생할 수 있다.
auto.offset.reset
설정 사용earliest
: 맨 처음 오프셋 사용latest
: 가장 마지막 오프셋 사용 (default)none
: 컨슈머 그룹에 대한 이전 커밋이 없으면 Exception 발생fetch.min.bytes
: 조회시 브로커가 전송할 최소 데이터 크기fetch.max.wait.ms
: 데이터가 최소 크기가 될 때까지 기다리는 시간poll()
메서드의 대기 시간과 다름max.partition.fetch.bytes
: 파티션 당 서버가 리턴할 수 있는 최대 크기.enable.auto.commit
true
: 일정 주기로 컨슈머가 읽은 오프셋을 커밋 (default)false
: 수동으로 커밋 실행auto.commit.interval.ms
: 자동 커밋 주기poll()
, close()
메서드 호출 시 자동 커밋 실행컨슈머는 하트비트를 전송해서 연결을 유지한다. 브로커는 일정 시간 컨슈머로부터 하트비트가 없으면 컨슈머를 그룹에서 빼고 리밸런스를 진행한다.
관련 설정은 다음과 같다.
session.timeout.ms
: 세션 타임 아웃 시간 (defalut 10초)heartbeat.interval.ms
: 하트비트 전송 주기 (default 3초)session.timeout.ms
의 1/3 이하 추천max.poll.interval.ms
는 poll()
메서드의 최대 호출 간격을 설정한다. 이 시간이 지나도록 poll()
하지 않으면 컨슈머를 그룹에서 빼고 리밸런스를 진행한다.
KafkaConsumer
는 쓰레드에 안전하지 않다. 여러 쓰레드에서 동시에 사용하면 안된다. 단, wakeup()
메서드는 예외이다.