kafka Streams를 java로 구현해보자!

Karim·2021년 12월 21일
4

kafka 예제

목록 보기
3/3
post-thumbnail

1. Version

💬

  • Kafka : 2.6.0
  • grdle : kafka-clients.2.8.1
  • grdle : kafka-streams.2.8.1

2. build.gradle

💬 build.gredle dependencies

dependencies {
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.8.1'
    compile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.8.1'
}

3. Kafka Streams 개념

💬 link

kafka Streams를 알아보자!

4. Kafka Streams 구현

💻 java code

예제는 간단!

  • kafka 설치한 서버에 어떠한 시스템이 메세지를 게시하면
    그 해당 메세지를 실시간으로 조건 필터하는 기능이다.!

    필터 조건 : 들어오는 글자 수가 5개 초과만 데이터를 전달한다.

package com.karim.kafkaBasis.kafkaStreams;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class KafkaStreamsBasis {

    private static final String KAFKA_SINGLE_IP = "192.168.124.222:9092";
    private static final String RECEIVE_TOPIC_NAME = "karim-rcv-topic";
    private static final String SEND_TOPIC_NAME = "karim-send-topic";

    public static void main(final String[] args) throws Exception {
        Properties props = new Properties();
        // 카프카 스트림즈를 유일하게 구분할 ID값
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        // 스트림즈에 접근할 카프카 broker 정보
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SINGLE_IP);
        // 데이터를 어떤 형식으로 Read/Write 할지 성정 (키/값의 데이터 타입을 지정)
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // 스트림 토폴로지를 정의하기 위한 빌더
        StreamsBuilder builder = new StreamsBuilder();

        // 소스 프로세서 동작 -> RECEIVE_TOPIC_NAME 토픽으로 부터 KStream 객체를 만든다.
        KStream<String, String> stringLength5Over = builder.stream(RECEIVE_TOPIC_NAME);

        // 스트림 프로세서 동작 ->
        // RECEIVE_TOPIC_NAME 토픽에서 가져온 데이터 중
        // length 가 5를 넘는 경우의 값만 남도록 필터링 하여 KStream 객체를 새롭게 생성
        KStream<String, String> filterStream = stringLength5Over.filter(
                ((key, value) -> value.length() > 5)
        );

        // 싱크 프로세서 동작 ->
        // SEND_TOPIC_NAME 토픽으로 KStream 데이터를 전달한다.
        filterStream.to(SEND_TOPIC_NAME);

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

5. Kafka Console Producer

💬 console view

  • 글자수가 5개 초과인 Karim velog cuteeeee-와 5개 미만인 Kari를 차례대로 producer한다.
[karim@kafka-single bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic karim-rcv-topic
> Karim velog cuteeeee-
> Kari

6. Kafka Console Consumer

💬 console view

  • Kari 은 Stream 필터 조건에 만족하지 않으므로 Stream쪽에서 걸러짐!
[karim@kafka-single bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic karim-send-topic
Karim velog cuteeeee-

📚 참고

profile
나도 보기 위해 정리해 놓은 벨로그

0개의 댓글