Spring Kafka Consumer

이진호·2024년 5월 31일
0

Spring Kafka

목록 보기
1/1

MessageListenerContainer

@KafkaListener

  • Bean 메소드를 리스너로 지정하는 어노테이션.

  • MessagingMessageListenerAdapter로 감싸지게 됨.

    public class Listener {
    
        @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
        public void listen(String data) {
            ...
        }
    
    }
  • ConcurrentMessageListenerContainer 구성을 위한 listener container factory 역할의@Configuration 클래스에 @EnableKafka 설정 필요.

    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
        @Bean
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                            kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(3);
            factory.getContainerProperties().setPollTimeout(3000);
            return factory;
        }
    
        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            ...
            return props;
        }
    }
  • 이외 옵션은 상세 가이드 확인: https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html

출처

0개의 댓글