Apache Kakfa에서 버전에 맞게 다운로드
2.~ 버전과 3.~ 버전의 차이만 확인해보시고 다운로드 받으면 됩니다.
전 2.5.0 버전과 3.7.1버전을 다운받았습니다.
-> 에러 찾을려고 2개 다운 받은 것이니 한개만 다운 받으셔도 됩니다.
window 11 사용 중입니다.
전 바로 C:\kafka_2.12-2.5.0
아래로 경로 설치 했습니다.
수정 한 파일입니다.
zookeeper.properties
기본 설정값 사용해도 됩니다만은 모든 경로를 절대 경로로 지정했는데,
만약 window powershell에서 서버 시작을 C: , C:\kafka, C:\kafka\bin
등 실행위치에 따른 상대경로로 파일이 계속 생성되어 수정했습니다.
dataDir=/kafka_2.12-2.5.0/zookeeper
server.properties
경로 수정은 위와 같은 이유로 수정
서버 포트 변경을 한 이유는 H2 DB를 사용하다 보니 포트 충돌로인해 9092 -> 9093으로 수정했습니다.
listeners=PLAINTEXT://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9093
log.dirs=/kafka_2.12-2.5.0/data
C:\kafka_2.12-2.5.0\data
아닌/kafka_2.12-2.5.0/data
설정했습니다.
zookeeper 실행
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
kafka 실행
.\bin\windows\kafka-server-start.bat .\config\server.properties
Topic 생성
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9093 --topic management_car_exception_topic --create
Topic 확인
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9093 --list
Producer
.\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9093 --topic management_car_exception_topic
--property "parse.key=true" --property "key.separator=:"
consumer
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9093 --topic test_elasticsearch_sink --from-beginning
gradle 설정 추가
implementation 'org.springframework.kafka:spring-kafka'
Producer 설정
KafkaConstants.KAFKA_BOOTSTRAP_SERVERS = localhost:9093
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
Producer 전송
보내는 value는 type은 자유롭게 전달하면 됩니다.
KafkaConstants.MANAGE_CAR_PRODUCER = topic 전송 시 key
KafkaConstants.SEND_TELEGRAM_TOPIC = topic 명
@Component
@RequiredArgsConstructor
public class KafkaSendMessage {
public final KafkaTemplate kafkaTemplate;
public void kafkaSendAlarmMessage(TopicExceptionDto exceptionTopic){
kafkaTemplate.send(KafkaConstants.SEND_TELEGRAM_TOPIC, KafkaConstants.MANAGE_CAR_PRODUCER , exceptionTopic);
}
}
Consumer 설정
JsonDeserializer.class
를 많이 사용한다.@Configuration
public class KafkaConsumerConfig {
@Bean
public DefaultKafkaConsumerFactory consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfig(), new StringDeserializer(), new StringDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
return props;
}
}
Consumer
acknowledgment.acknowledge();
은 수동 커밋으로 Config > AckMode 설정입니다.@Component
@RequiredArgsConstructor
public class KafkaConsumer implements AcknowledgingMessageListener<String, String> {
public final TelegramSend telegramSend;
@Override
@KafkaListener(topics = KafkaConstants.SEND_TELEGRAM_TOPIC,
groupId = "management_car_consumer_1",
containerFactory = "kafkaListenerContainerFactory")
public void onMessage(ConsumerRecord<String, String> consumerRecord,
Acknowledgment acknowledgment) {
telegramSend.send(consumerRecord.value());
acknowledgment.acknowledge();
}
}
이렇게 보낸값을 Consumer에서 받아
이후에 처리해야할 일들을 작성하면 되겠습니다