1편과 2편을 통해서 컨슈머가 종료되었을 때 컨슈머 서비스와 브로커에서 일어나는 일련의 동작들에 대해서 알아봤다. 이 과정에서 디버깅 시 겪었던 자동컨슘(?) 상황도 이해할 수 있게 되었다. 이번엔 반대로 그러면 컨슈머가 생겨났을 때, 컨슈머 서비스와 브로커에선 각각 어떤 동작이 일어나는지 확인해보고자 한다!
1. 브로커가 컨슈머의 생존여부를 감지하는 방법 (Kafka Client 1편)
2. 컨슈머가 종료될 때 브로커로 전달되는 내용 (정상종료 / 비정상 종료)
3. 컨슈머가 살아났을 때 브로커로부터 가져오는 데이터
간단하게 생각해보자. Consumer가 재기동 되면, 포함된 브로커로 poll 요청을 하게 될 태고, 새로운 Consumer가로부터 요청이 왔음을 감지한 Broker는 리벨런싱 작업을 하게 될 것이다.
리벨런싱 이후 추가된 컨슈머까지 정상적으로 이벤트를 수신하게되지 않을까 한다.
실제로 비슷한 플로우를 통해서 브로커에 등록 및 컨슈머가 데이터를 소비하게 된다.
Consumer가 처음 동작하기 시작한 시점부터 알아보자.
Spring boot Application이 동작하면서 Application Context들을 띄우게 되고
SmartLifecycle 인터페이스를 구현하는 모든 Bean을 찾아서 start (=doStart() 함수)를 호출하게 된다.
SmartLivecycle인터페이스의 위치를 찾아 거슬러 올라가보면
AbstractMessageListenerContainer > GenericMessageListenerContainer > MessageListenerContainer > "SmartLifecycle"
public class ConcurrentMessageListenerContainer extends AbstractMessageListenerContainer {
...
...
@Override
protected void doStart() {
if (!isRunning()) {
checkTopics(); // 토픽 유효성 검사
//ContainerProperties 객체는 메시지 처리 방식에 대한 정의를 담고있다. (어느 토픽,파티션에서 어떤 주기,크기로)
ContainerProperties containerProperties = getContainerProperties();
//파티션의 offset 조회 (없으면 null)
TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
// 동시 소비 스레드(concurrency)가 파티션 수보다 많으면 경고를 내보냄. → 파티션보다 스레드가 많아봤자 의미가 없으니 concurrency를 topicPartitions.length로 조정.
if (topicPartitions != null && this.concurrency > topicPartitions.length) {
this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or "
+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
+ topicPartitions.length);
this.concurrency = topicPartitions.length;
}
clearState(); // 설정 초기화
setRunning(true); // 실행상태로 변경
for (int i = 0; i < this.concurrency; i++) {
// 실제 메시지를 소비하는 객체 생성
KafkaMessageListenerContainer<K, V> container =
constructContainer(containerProperties, topicPartitions, i);
configureChildContainer(i, container);
if (isPauseRequested()) {
container.pause();
}
container.start(); // KafkaMessageListenerContainer.doStart() 호출부
this.containers.add(container);
}
}
...
...
}
}
KafkaMessageListnerContaier 객체 생성 후, doStart()를 통해 컨테이너를 실행시키게 된다.
이때, 내부적으로 실제 컨슘을 담당하게될 ListenerContainer (스레드)를 만들고, containerExecutor를 통해서 실행하게 된다.
사실 굳이 "그냥 비동기 수행하지 왜 ContainerExecutor 만들어서 수행하지?"라는 생각이 들었지만, 작고 어린 나의 시각에선 별개의 스레드에서 돈다는 명확한 실행장치가 필요했던걸까..? 싶기도 하다. 혹은 그냥 스레드 이름을 지정하고싶어서...!? 로깅 및 디버깅에 유리하니까..!? - 헉 이거같다..! - 사실 잘 모름
public class KafkaMessageListenerContainer {
...
...
@Override
protected void doStart() {
// 이미 실행중이라면 return
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand-alone container
checkTopics();
}
ContainerProperties containerProperties = getContainerProperties();
Object messageListener = containerProperties.getMessageListener();
// consumerExecutor 생성 (각 KafkaConsumer Container가 독립된 스레드에서 메시지를 소비하도록 하기 위해)
AsyncTaskExecutor consumerExecutor = containerProperties.getListenerTaskExecutor();
if (consumerExecutor == null) {
consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setListenerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = determineListenerType(listener);
ObservationRegistry observationRegistry = containerProperties.getObservationRegistry();
if (observationRegistry.isNoop()) {
ApplicationContext applicationContext = getApplicationContext();
if (applicationContext != null && containerProperties.isObservationEnabled()) {
ObservationRegistry reg = applicationContext.getBeanProvider(ObservationRegistry.class)
.getIfUnique();
if (reg != null) {
observationRegistry = reg;
}
}
}
// 생성자에서 Subscribe -> 토픽만 있는 경우 브로커로부터 파티션 정보 받아옴 (리벨런싱 동작을 수반함)
this.listenerConsumer = new ListenerConsumer(listener, listenerType, observationRegistry);
setRunning(true);
this.startLatch = new CountDownLatch(1);
// CompletableFuture.runAsync(this.listenerConsumer); 를 통해 ListenerConsumer의 run() 함수 수행
this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer);
try {
// Latch를 통해 수행을 특정 시간동안 대기. 실패시 로직 수행
if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(),
TimeUnit.MILLISECONDS)) {
this.logger.error("Consumer thread failed to start - does the configured task executor "
+ "have enough threads to support all containers and concurrency?");
publishConsumerFailedToStart();
}
}
catch (@SuppressWarnings(UNUSED) InterruptedException e) {
Thread.currentThread().interrupt();
}
}
...
...
}
ListenerConsumer의 생성자 로직에는 각각 설정들을 적용하면서, 브로커와의 통신을 통해 어떤 토픽의 파티션을 소비하게되는지 설정값을 가져오게된다.
내부 subscribeOrAssignTopics()
함수를 살펴보게되면 파티션 정보가 없으면, 브로커에게 리벨런싱 및 파티션 정보를 얻어오게 된다. (subscribe 과정)
이후, 우리가 이전에(1편,2편)에 봤던 run() 함수를 consumerExecutor.submitCompletable(this.listenerConsumer);
통해 수행하면서 poll() 과정이 시작된다.
public class ListenerConsumer {
ListnerConsumer(...) {
...
...
subscribeOrAssignTopics(consumer);
...
}
private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscribingConsumer) {
if (KafkaMessageListenerContainer.this.topicPartitions == null) {
ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener();
Pattern topicPattern = this.containerProperties.getTopicPattern();
if (topicPattern != null) {
subscribingConsumer.subscribe(topicPattern, rebalanceListener);
}
else {
subscribingConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), // NOSONAR
rebalanceListener);
}
}
else {
List<TopicPartitionOffset> topicPartitionsToAssign =
Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
this.definedPartitions = Collections.synchronizedMap(
new LinkedHashMap<>(topicPartitionsToAssign.size()));
for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) {
this.definedPartitions.put(topicPartition.getTopicPartition(),
new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(),
topicPartition.getPosition()));
}
subscribingConsumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
}
}
@Override
public void run() {
...
}
}
이번 1편, 2편, 3편을 거치면서 평소 궁금했던 컨슈머의 동작 과정에 대해 알아보게 되었다.
사실 이런 구현체 코드를 탐구하는 것이 익숙치 않아 어디까지 내가 보고 이해할수있을까 싶었는데, 막상 보다보니 또 생각보다 수월하게(?) 읽히는 느낌도 있었다.
역시 굴지의 개발자들이 작성한 코드인가...! 물론 내 이해가 조금 잘못되었을 수도 있겠지만, 읽은 내용을 리서치하고, 공식문서를 바탕으로 찾아보면서 생각과 그리 다르지 않구나 느꼈다.
컨슈머와 브로커의 관계, 내부 이벤트를 기반으로 동작하는 내용과 각 동작에 대한 순서를 살펴보면서 여러 인사이트를 얻은 것 같다.
찾다보니 Kafka에서 관리하는 지라가 오픈되어있었는데, 조만간 쉬운 난이도를 기준으로 컨트리뷰트해보고 싶은 마음도 생겼다!!
무튼 컨슈머 대장정을 지내고나니 몰랐던걸 알게된 뿌듯함과, 앞으로의 목표도 생겨서 의미있는 시간이 된 듯 하다. 😃