Spring Boot Kafka 이미 처리한 토픽 다시 처리되는 오류 해결

최민길(Gale)·2023년 9월 5일
1

Spring Boot 적용기

목록 보기
45/46

안녕하세요 이번 시간에는 Kafka에서 이미 처리한 토픽이 다른 API 실행 시 중복되어 처리되는 오류를 해결하는 포스팅을 해보도록 하겠습니다.

우선 발생한 오류를 먼저 말씀드리겠습니다. 다이어리 초대 요청을 보냈을 때 프로듀서에서 FCM 관련 토픽을 전달하면 컨슈머에서 해당 토픽을 캐치하여 로직을 처리합니다. 이후 메일 발송을 위해 프로듀서에서 메일 관련 토픽을 전달하면 메일 전송과 함께 FCM도 같이 전송되는 문제가 있었습니다.

문제의 원인을 찾아본 결과 Kafka에서 읽은 메시지가 커밋되지 않아 처리한 메시지가 읽지 않은 상태로 변경되어 지속적으로 해당 토픽을 listen한 것이 문제였습니다. Spring Boot의 @KafkaListner는 기본적으로 자동 커밋을 지원하지만 KafkaConfig를 새롭게 커스텀하는 과정에서 자동 커밋이 되지 않아 이런 문제가 발생했습니다.

그렇다면 Kafka에서는 어떻게 커밋을 이용하여 읽은 메시지를 처리할까요? Kafka의 경우 컨슈머가 메시지를 처리한 후 어디까지 읽었는지 Kafka 브로커에 알리는 프로세스로 커밋을 진행합니다. 이로 인해 메시지를 중복 처리하지 않고 장애 발생 시에도 마지막으로 메시지를 처리한 위치를 보존할 수 있어 안정적인 서비스를 구축할 수 있습니다.

Kafka의 커밋 프로세스는 다음과 같습니다.

  1. 메시지 소비 : 컨슈머가 브로커로부터 메시지를 받아 처리합니다.
  2. 오프셋 추적 : 컨슈머가 처리한 메시지의 오프셋(메시지 위치)를 추적합니다.
  3. 커밋 요청 : 특정 조건에 맞게 컨슈머 그룹의 오프셋 관리자에게 커밋을 요청합니다.
  4. 커밋 처리 : 오프셋 관리자가 커밋 요청을 받으면 각 컨슈머의 파티션 별로 가장 최근에 처리한 오프셋을 브로커에 저장합니다.
  5. 오프셋 관리 : 컨슈머 그룹은 컨슈머의 소비 상태를 오프셋 관리자에게 주기적으로 보고하여 어떤 오프셋까지 메시지를 처리했는지 추적하고 관리할 수 있습니다.

Kafka는 커밋 주기를 설정하여 커밋이 자동적으로 주기적으로 수행될 수 있도록 설정할 수 있으며, 커밋이 실패할 경우 에러 처리 메커니즘을 사용하여 커밋을 다시 시도하거나 에러를 로그에 기록하고 관리자에게 알릴 수 있습니다.

저는 매 요청마다 커밋을 자동적으로 수행하는 ENABLE_AUTO_COMMIT 옵션을 활성화하기 위해 KafkaConfig 클래스를 수정했습니다. ConsumerFactory에서 자동 커밋 옵션을 관리하기 때문에 이곳에서 configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); 코드를 추가하여 자동 커밋을 수행합니다. 이를 적용하면 처리된 토픽은 다시 요청되지 않고 한번만 수행됩니다.

    @Bean
    public ConsumerFactory<String, byte[]> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }
profile
저는 상황에 맞는 최적의 솔루션을 깊고 정확한 개념의 이해를 통한 다양한 방식으로 해결해오면서 지난 3년 동안 신규 서비스를 20만 회원 서비스로 성장시킨 Software Developer 최민길입니다.

0개의 댓글