A Topic : management_car_exception_topic
B Topic : send_telegram_topic
Application 에서 에러 발생 시 A Topic 으로 전송을 하게 되고 Streams를 통해 A -> B Topic으로 전송해주면
B Topic의 Consumer가 확인하여 담당자에게 전송하도록 하는 파이프 라인 구축
B Topic 부터 생성
.\bin\windows\kafka-topics.bat --bootstarp-server localhost:9093 --topic send_telegram_topic --create
gradle 설정 추가
implementation 'org.apache.kafka:kafka-streams'
Streams 설정 추가
@Slf4j
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean
KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, KafkaConstants.KAFKA_STREAM_APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}
Streams 추가
A : KafkaConstants.EXCEPTION_TOPIC
B : KafkaConstants.SEND_TELEGRAM_TOPIC
A -> B 로 Topic 전달
@Bean
public KStream<String, TopicExceptionDto> kStream(StreamsBuilder streamsBuilder) {
final Serializer<TopicExceptionDto> jsonSerializer = new JsonSerializer();
final Deserializer<TopicExceptionDto> jsonDeserializer = new JsonDeserializer(TopicExceptionDto.class, false);
final Serde<TopicExceptionDto> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStream<String, TopicExceptionDto> stream = streamsBuilder.stream(KafkaConstants.EXCEPTION_TOPIC, Consumed.with(Serdes.String(), jsonSerde));
stream
.mapValues(TopicExceptionDto::ToSendMessage)
.peek((k, v) -> log.info("KStream log : key - {}, value - {}", k, v))
.to(KafkaConstants.SEND_TELEGRAM_TOPIC);
return stream;
}
해당 토픽 value가 Json 형식으로 되어있어 Streams에서 TopicExceptionDto
로 받기 위해 Serdes
를 사용했습니다.
A Topic으로 메시지가 전달되면 Streams를 통해 B Topic으로 전달되는 것을 확인할 수 있습니다.
해당 Consumer가 데이터를 잘 가지고 오는지 확인해 보면 된다.
@KafkaListener(topics = KafkaConstants.SEND_TELEGRAM_TOPIC,
groupId = "management_car_consumer_1",
containerFactory = "kafkaListenerContainerFactory")
public void onMessage(ConsumerRecord<String, String> consumerRecord,
Acknowledgment acknowledgment) {
telegramSend.send(consumerRecord.value());
acknowledgment.acknowledge();
}
위 Streams통해 값이 String으로 변환되었기에 Consumer에서도 String으로 받고있습니다.
받은 정보를 Telegram으로 전송 시, 알림을 확인할 수 있고 전송이 완료된 정보는 commit처리를 하게 됩니다.
Processed 1 total records, ran 0 punctuators, and committed 1 total tasks since the last update
알림 기능은 텔레그램 전송 방법에서 확인해보시면 됩니다.