Kafka 컨슈머 config

h블로그·2021년 11월 19일
1

기본 설정 외에 필요해서 추가한 설정

ConsumerConfiguration.java 예시

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, Example> orderBoxTrackingListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Example> factory = new ConcurrentKafkaListenerContainerFactory<>();
        ObjectMapper objectMapper = new ObjectMapper()
                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        JsonDeserializer jsonDeserializer = new JsonDeserializer(Example.class, objectMapper);
        factory.setConsumerFactory(getConsumerFactory(jsonDeserializer));
        factory.setErrorHandler(errorLog());
        return factory;
    }

Dto에 없는 property 있어도 에러를 발생하지 않겠다는 설정

new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

errorHandler 설정

consumer 이하 에서 발생한 error handler 잡는 부분.
나는 error를 잡아서 로그로 찍는 부분만 구현해서 아래 메소드만 추가로 더 구현했었다.

private ErrorHandler errorLog() {
        return (exception, data) -> log.error("Handle Consumer Exception: {} | topic: {} | key: {} | partition: {} | offset: {}",
                exception.getMessage(), data.topic(), data.key(), data.partition(), data.offset(), exception);
    }

DLT 토픽을 가지게 되는 경우 setErrorHandler() 이쪽에 아래처럼 구현하면 된다.

factory.setErrorHandler(new SeekToCurrentErrorHandler(
                new DeadLetterPublishingRecoverer(kafkaTemplate),
                new FixedBackOff(1000L, 2L)));
  • DeadLetterPublishingRecoverer 는 데드 레터 토픽에 메시지를 전송할 때 사용
  • 1000L 간격으로 2L 시도하고, DLT 토픽으로 보낸단 의미
profile
😎🙈🙈🙈🤓

0개의 댓글