Kafka streams 추가

S_H_H·2024년 7월 9일
0

Kafka 미니 플젝

목록 보기
4/7
A Topic : management_car_exception_topic
B Topic : send_telegram_topic

Application 에서 에러 발생 시 A Topic 으로 전송을 하게 되고 Streams를 통해 A -> B Topic으로 전송해주면
B Topic의 Consumer가 확인하여 담당자에게 전송하도록 하는 파이프 라인 구축

B Topic 부터 생성

.\bin\windows\kafka-topics.bat --bootstarp-server localhost:9093 --topic send_telegram_topic --create

Kafka Streams 추가

gradle 설정 추가

implementation 'org.apache.kafka:kafka-streams'

Streams 설정 추가

@Slf4j
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean
    KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, KafkaConstants.KAFKA_STREAM_APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        return new KafkaStreamsConfiguration(props);
    }

}

Streams 추가

A : KafkaConstants.EXCEPTION_TOPIC 
B : KafkaConstants.SEND_TELEGRAM_TOPIC
A -> B 로 Topic 전달

    @Bean
    public KStream<String, TopicExceptionDto> kStream(StreamsBuilder streamsBuilder) {
        final Serializer<TopicExceptionDto> jsonSerializer = new JsonSerializer();
        final Deserializer<TopicExceptionDto> jsonDeserializer = new JsonDeserializer(TopicExceptionDto.class, false);
        final Serde<TopicExceptionDto> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        KStream<String, TopicExceptionDto> stream = streamsBuilder.stream(KafkaConstants.EXCEPTION_TOPIC, Consumed.with(Serdes.String(), jsonSerde));

        stream
                .mapValues(TopicExceptionDto::ToSendMessage)
                .peek((k, v) -> log.info("KStream log : key - {}, value - {}", k, v))
                .to(KafkaConstants.SEND_TELEGRAM_TOPIC);
        return stream;
    }

해당 토픽 value가 Json 형식으로 되어있어 Streams에서 TopicExceptionDto로 받기 위해 Serdes를 사용했습니다.
A Topic으로 메시지가 전달되면 Streams를 통해 B Topic으로 전달되는 것을 확인할 수 있습니다.

Streams Topic 확인

해당 Consumer가 데이터를 잘 가지고 오는지 확인해 보면 된다.

    @KafkaListener(topics = KafkaConstants.SEND_TELEGRAM_TOPIC,
            groupId = "management_car_consumer_1",
            containerFactory = "kafkaListenerContainerFactory")
    public void onMessage(ConsumerRecord<String, String> consumerRecord,
                          Acknowledgment acknowledgment) {

        telegramSend.send(consumerRecord.value());
        acknowledgment.acknowledge();

    }

위 Streams통해 값이 String으로 변환되었기에 Consumer에서도 String으로 받고있습니다.
받은 정보를 Telegram으로 전송 시, 알림을 확인할 수 있고 전송이 완료된 정보는 commit처리를 하게 됩니다.

Processed 1 total records, ran 0 punctuators, and committed 1 total tasks since the last update

알림 기능은 텔레그램 전송 방법에서 확인해보시면 됩니다.

profile
LEVEL UP

0개의 댓글