MessageListenerContainer
를 설정하고, @KafkaListener
를 사용함으로써 카프카 메시지 수신 가능.MessageListenerContainer
는 다수의 Interface를 제공(상세)KafkaMessageListenerContainer
로 구성되어 멀티 스레드 처리를 지원.Bean 메소드를 리스너로 지정하는 어노테이션.
MessagingMessageListenerAdapter
로 감싸지게 됨.
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
ConcurrentMessageListenerContainer
구성을 위한 listener container factory 역할의@Configuration
클래스에 @EnableKafka
설정 필요.
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...
return props;
}
}
이외 옵션은 상세 가이드 확인: https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html
출처
- https://velog.io/@youmakemesmile/Spring-KafkaKafka-Consumer-%EC%A0%95%EB%A6%AC
- https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/message-listeners.html
- https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/message-listener-container.html#kafka-container
- https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/message-listener-container.html#using-ConcurrentMessageListenerContainer
- https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html