Kafka 컨슈머가 갑자기 죽었을 때 어떤 동작이 일어날까? (Kafka Client 3편)

1

내 공부

목록 보기
7/7
post-thumbnail

배경

1편과 2편을 통해서 컨슈머가 종료되었을 때 컨슈머 서비스와 브로커에서 일어나는 일련의 동작들에 대해서 알아봤다. 이 과정에서 디버깅 시 겪었던 자동컨슘(?) 상황도 이해할 수 있게 되었다. 이번엔 반대로 그러면 컨슈머가 생겨났을 때, 컨슈머 서비스와 브로커에선 각각 어떤 동작이 일어나는지 확인해보고자 한다!

1. 브로커가 컨슈머의 생존여부를 감지하는 방법 (Kafka Client 1편)
2. 컨슈머가 종료될 때 브로커로 전달되는 내용 (정상종료 / 비정상 종료)
3. 컨슈머가 살아났을 때 브로커로부터 가져오는 데이터

뇌피셜을 정성스럽게 설명해보겠습니다.

간단하게 생각해보자. Consumer가 재기동 되면, 포함된 브로커로 poll 요청을 하게 될 태고, 새로운 Consumer가로부터 요청이 왔음을 감지한 Broker는 리벨런싱 작업을 하게 될 것이다.
리벨런싱 이후 추가된 컨슈머까지 정상적으로 이벤트를 수신하게되지 않을까 한다.

실제로 비슷한 플로우를 통해서 브로커에 등록 및 컨슈머가 데이터를 소비하게 된다.

진짜 동작을 알아보자.

Consumer가 처음 동작하기 시작한 시점부터 알아보자.
Spring boot Application이 동작하면서 Application Context들을 띄우게 되고
SmartLifecycle 인터페이스를 구현하는 모든 Bean을 찾아서 start (=doStart() 함수)를 호출하게 된다.

SmartLivecycle인터페이스의 위치를 찾아 거슬러 올라가보면
AbstractMessageListenerContainer > GenericMessageListenerContainer > MessageListenerContainer > "SmartLifecycle"


public class ConcurrentMessageListenerContainer extends AbstractMessageListenerContainer {
	...
    ...
    @Override
	protected void doStart() {
		if (!isRunning()) {
			checkTopics(); // 토픽 유효성 검사
            
            //ContainerProperties 객체는 메시지 처리 방식에 대한 정의를 담고있다. (어느 토픽,파티션에서 어떤 주기,크기로)
			ContainerProperties containerProperties = getContainerProperties(); 
            
            //파티션의 offset 조회 (없으면 null)
			TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
			
            // 동시 소비 스레드(concurrency)가 파티션 수보다 많으면 경고를 내보냄. → 파티션보다 스레드가 많아봤자 의미가 없으니 concurrency를 topicPartitions.length로 조정.
            if (topicPartitions != null && this.concurrency > topicPartitions.length) {
				this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or "
						+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
						+ topicPartitions.length);
				this.concurrency = topicPartitions.length;
			}
			clearState(); // 설정 초기화 
			setRunning(true); // 실행상태로 변경

			for (int i = 0; i < this.concurrency; i++) {
            	// 실제 메시지를 소비하는 객체 생성
				KafkaMessageListenerContainer<K, V> container =
						constructContainer(containerProperties, topicPartitions, i);
				configureChildContainer(i, container);
				if (isPauseRequested()) {
					container.pause();
				}
				container.start(); // KafkaMessageListenerContainer.doStart() 호출부
				this.containers.add(container);
			}
		}
        
        ...
        ...
	}

}

KafkaMessageListnerContaier 객체 생성 후, doStart()를 통해 컨테이너를 실행시키게 된다.
이때, 내부적으로 실제 컨슘을 담당하게될 ListenerContainer (스레드)를 만들고, containerExecutor를 통해서 실행하게 된다.

사실 굳이 "그냥 비동기 수행하지 왜 ContainerExecutor 만들어서 수행하지?"라는 생각이 들었지만, 작고 어린 나의 시각에선 별개의 스레드에서 돈다는 명확한 실행장치가 필요했던걸까..? 싶기도 하다. 혹은 그냥 스레드 이름을 지정하고싶어서...!? 로깅 및 디버깅에 유리하니까..!? - 헉 이거같다..! - 사실 잘 모름

public class KafkaMessageListenerContainer {

	...
    ...
	@Override
	protected void doStart() {
    	// 이미 실행중이라면 return
		if (isRunning()) { 
			return;
		}
		if (this.clientIdSuffix == null) { // stand-alone container
			checkTopics();
		}
		ContainerProperties containerProperties = getContainerProperties();

		Object messageListener = containerProperties.getMessageListener();
        
        // consumerExecutor 생성 (각 KafkaConsumer Container가 독립된 스레드에서 메시지를 소비하도록 하기 위해)
		AsyncTaskExecutor consumerExecutor = containerProperties.getListenerTaskExecutor();
		if (consumerExecutor == null) {
			consumerExecutor = new SimpleAsyncTaskExecutor(
					(getBeanName() == null ? "" : getBeanName()) + "-C-");
			containerProperties.setListenerTaskExecutor(consumerExecutor);
		}
        
		GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
		ListenerType listenerType = determineListenerType(listener);
		ObservationRegistry observationRegistry = containerProperties.getObservationRegistry();
		if (observationRegistry.isNoop()) {
			ApplicationContext applicationContext = getApplicationContext();
			if (applicationContext != null && containerProperties.isObservationEnabled()) {
				ObservationRegistry reg = applicationContext.getBeanProvider(ObservationRegistry.class)
						.getIfUnique();
				if (reg != null) {
					observationRegistry = reg;
				}
			}
		}
        
        // 생성자에서 Subscribe -> 토픽만 있는 경우 브로커로부터 파티션 정보 받아옴 (리벨런싱 동작을 수반함)
		this.listenerConsumer = new ListenerConsumer(listener, listenerType, observationRegistry);
		setRunning(true);
		this.startLatch = new CountDownLatch(1);
        
        // CompletableFuture.runAsync(this.listenerConsumer); 를 통해 ListenerConsumer의 run() 함수 수행
		this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer);
		try {
        	// Latch를 통해 수행을 특정 시간동안 대기. 실패시 로직 수행
			if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(),
					TimeUnit.MILLISECONDS)) {

				this.logger.error("Consumer thread failed to start - does the configured task executor "
						+ "have enough threads to support all containers and concurrency?");
				publishConsumerFailedToStart();
			}
		}
		catch (@SuppressWarnings(UNUSED) InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}
    
    ...
    ...
}

ListenerConsumer의 생성자 로직에는 각각 설정들을 적용하면서, 브로커와의 통신을 통해 어떤 토픽의 파티션을 소비하게되는지 설정값을 가져오게된다.
내부 subscribeOrAssignTopics() 함수를 살펴보게되면 파티션 정보가 없으면, 브로커에게 리벨런싱 및 파티션 정보를 얻어오게 된다. (subscribe 과정)

이후, 우리가 이전에(1편,2편)에 봤던 run() 함수를 consumerExecutor.submitCompletable(this.listenerConsumer);통해 수행하면서 poll() 과정이 시작된다.

public class ListenerConsumer {
	ListnerConsumer(...) {
    	...
        ...
        subscribeOrAssignTopics(consumer);
        ...
    }
    
    private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscribingConsumer) {
		if (KafkaMessageListenerContainer.this.topicPartitions == null) {
			ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener();
			Pattern topicPattern = this.containerProperties.getTopicPattern();
			if (topicPattern != null) {
				subscribingConsumer.subscribe(topicPattern, rebalanceListener);
			}
			else {
				subscribingConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), // NOSONAR
						rebalanceListener);
			}
		}
		else {
			List<TopicPartitionOffset> topicPartitionsToAssign =
					Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
			this.definedPartitions = Collections.synchronizedMap(
					new LinkedHashMap<>(topicPartitionsToAssign.size()));
			for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) {
				this.definedPartitions.put(topicPartition.getTopicPartition(),
						new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(),
								topicPartition.getPosition()));
			}
			subscribingConsumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
		}
	}
    
    @Override
    public void run() {
		...
	}
}

이번 1편, 2편, 3편을 거치면서 평소 궁금했던 컨슈머의 동작 과정에 대해 알아보게 되었다.
사실 이런 구현체 코드를 탐구하는 것이 익숙치 않아 어디까지 내가 보고 이해할수있을까 싶었는데, 막상 보다보니 또 생각보다 수월하게(?) 읽히는 느낌도 있었다.
역시 굴지의 개발자들이 작성한 코드인가...! 물론 내 이해가 조금 잘못되었을 수도 있겠지만, 읽은 내용을 리서치하고, 공식문서를 바탕으로 찾아보면서 생각과 그리 다르지 않구나 느꼈다.

컨슈머와 브로커의 관계, 내부 이벤트를 기반으로 동작하는 내용과 각 동작에 대한 순서를 살펴보면서 여러 인사이트를 얻은 것 같다.
찾다보니 Kafka에서 관리하는 지라가 오픈되어있었는데, 조만간 쉬운 난이도를 기준으로 컨트리뷰트해보고 싶은 마음도 생겼다!!

무튼 컨슈머 대장정을 지내고나니 몰랐던걸 알게된 뿌듯함과, 앞으로의 목표도 생겨서 의미있는 시간이 된 듯 하다. 😃

profile
시은이의 살아남기 시리즈!

0개의 댓글