Kafka Streams 파이썬으로 구현하기 (with ksqlDB)

이지민·2023년 5월 6일
0

Kafka Streams란?

" Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology. "

(출처: https://www.confluent.io/blog/hello-world-kafka-connect-kafka-streams/, Confluent Blog)

카프카 스트림즈는 토픽에 적재된 데이터실시간으로 변환하여 다른 토픽에 적재하는 라이브러리이다.
(출처: 아파치 카프카 애플리케이션 프로그래밍 with 자바, 책)

그런데!!!
""" 카프카 스트림즈 애플리케이션은 Java와 Scala로만 작성하고 배포할 수 있다. """

이 글을 쓰게 된 배경 및 목적

대부분의 개발 조직은 각자 상황에 맞게 인프라와 언어, 프레임워크 등의 기술 스택을 정립해서 쓰게된다. 만약 파이썬이나 Go를 주로 쓰는 조직에서 Kafka를 도입하고자 한다고 가정하자. 그런데 위에서 언급했듯이 카프카 스트림즈 애플리케이션은 Java와 Scala로만 작성이 가능하다. 그렇다면 파이썬이나 Go만 사용하는 조직은 이를 어떻게 해결해야 할까? 간단하게는 2가지 방법이 있을 수 있다. 첫번째, Java를 공부해서 도입한다. 물론 컴퓨터 전공을 한다하면 Java 정도는 충분히 할 수 있다. 하지만 조직관점에서 봤을 때는 운영을 하는데 사용하는 언어가 늘어가는 것은 분명 관리 포인트가 늘어나 부담이 된다. 그렇기 때문에 쉽지 않은 결정이다. 두번째, Kafka를 안쓴다. (하하...^^) 여튼 둘 다 좋은 방법은 아닌 듯 하다. 그래서 이 글에서는 언어에 상관없이 카프카 스트림즈를 사용할 수 있는 방법은 찾아보려고 한다.

필자는 Python이 편하기 때문에 이 글에서는 파이썬 기준으로 설명한다.

그럼 해결책은...?

바로 ksqlDB!!

ksqlDB란?

" ksqlDB is the streaming database for Apache Kafka®. With ksqlDB, you can write event streaming applications by using a lightweight SQL syntax. "

(출처: https://docs.ksqldb.io/en/0.8.x-ksqldb/concepts/ksqldb-and-kafka-streams/, ksqlDB Documents)

위 그림은 ksqlDB를 이해하기 딱 좋은 그림이다. ksqlDB는 카프카 스트림즈를 추상화하여 SQL 문으로 구현할 수 있게 한 것이다. 즉, SQL문을 통해 카프카 스트림즈를 사용할 수 있기 때문에 언어의 문제가 사라진다. 실제도 공식 문서를 보면, 다음과 같은 상황에서 ksqlDB를 사용하라고 한다.

" Want the power of Kafka Streams but you aren't on the JVM: use the ksqlDB REST API from Python, Go, C#, JavaScript, shell"

ksqlDB에 대해서는 별도의 글로 다루도록 하고 이 글에서는 예제를 통해 카프카 스트림즈와 ksqlDB를 비교하여 어떻게 사용하는지에 초점을 맞추려고 한다.

예제

모든 예제는 "아파치 카프카 애플리케이션 프로그래밍 with 자바" 책에서 가져온 것이다.
코드: https://github.com/bjpublic/apache-kafka-with-java/tree/master/Chapter3/3.5%20kafka-streams

[예제 1] Copy Stream

stream_log 토픽의 데이터를 stream_log_copy 토픽으로 옮겨오는 예제

JAVA (코드 출처)

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class SimpleStreamApplication {

    private static String APPLICATION_NAME = "streams-application";
    private static String BOOTSTRAP_SERVERS = "localhost:9092";
    private static String STREAM_LOG = "stream_log";
    private static String STREAM_LOG_COPY = "stream_log_copy";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream(STREAM_LOG);

        stream.to(STREAM_LOG_COPY);

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

    }
}

ksqlDB

ksqlDB에서 데이터 정의
출처: https://docs.ksqldb.io/en/latest/reference/sql/data-definition/
" This section covers how you create the structures that store your events. ksqlDB abstracts events as rows with columns and stores them in streams and tables. "
-> 그렇기 때문에 위의 예제와 100% 동일할 수는 없다. 대신 "data" column에 log를 작성하는 예제로 변경하여 구현했다.

1) stream_log 토픽 streamLog 스트림 생성

  • 데이터를 넣어줄 스트림을 생성한다.
ksql> CREATE STREAM streamLog (data VARCHAR)
  WITH (kafka_topic='stream_log', value_format='json', partitions=1);
  
 Message        
----------------
 Stream created 
----------------

2) streamLogCopy 스트림 생성

  • streamLog 스트림의 데이터를 복사해 올 streamLogCopy 스트림을 생성한다.
ksql> CREATE STREAM streamLogCopy AS SELECT * FROM streamLog EMIT CHANGES;

 Message                                    
--------------------------------------------
 Created query with ID CSAS_STREAMLOGCOPY_9 
--------------------------------------------

3) Push Query: streamLogCopy

  • streamLogCopy의 데이터를 Push Query를 통해 SELECT하여 데이터를 가져온다. (kafka_topic을 정해주지 않으면 대문자로 바뀌어 토픽이 자동 생성 된다. 이 경우 'STREAMLOGCOPY' 이름으로 토픽이 생성된다.)
ksql> SELECT * from streamLogCopy EMIT CHANGES;
+----------------------------------------------------------------------------------------------------+
|DATA                                                                                                |
+----------------------------------------------------------------------------------------------------+
  • 그 상태에서 streamLog 스트림에 데이터를 넣어준다.
ksql> INSERT INTO streamLog (data) VALUES ('Hello');
ksql> INSERT INTO streamLog (data) VALUES ('Jimin');
  • 그러면 SELECT 문 OUTPUT에 Hello, Jimin 데이터가 들어온 것을 확인할 수 있다.
ksql> SELECT * from streamLogCopy EMIT CHANGES;
+----------------------------------------------------------------------------------------------------+
|DATA                                                                                                |
+----------------------------------------------------------------------------------------------------+
|Hello                                                                                               |
|Jimin 
  • 실제 STREAMLOGCOPY 토픽을 consume 해보면 다음과 같이 결과를 확인할 수 있다.
$ kafka-console-consumer --bootstrap-server localhost:29092 --topic stream_log --from-beginning
{"DATA":"Hello"}
{"DATA":"Jimin"}
^CProcessed a total of 2 messages

$ kafka-console-consumer --bootstrap-server localhost:29092 --topic STREAMLOGCOPY --from-beginning
{"DATA":"Hello"}
{"DATA":"Jimin"}
^CProcessed a total of 2 messages

[예제 2] Stream-Table Join

user-product 주문 정보가 들어오면 제품을 어디로 보내야할지 user-address 매핑 테이블을 보고 결정하는 예제

JAVA (코드 출처)

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Properties;

public class KStreamJoinKTable {

    private static String APPLICATION_NAME = "order-join-application";
    private static String BOOTSTRAP_SERVERS = "localhost:9092";
    private static String ADDRESS_TABLE = "address";
    private static String ORDER_STREAM = "order";
    private static String ORDER_JOIN_STREAM = "order_join";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, String> addressTable = builder.table(ADDRESS_TABLE);
        KStream<String, String> orderStream = builder.stream(ORDER_STREAM);

        orderStream.join(addressTable, (order, address) -> order + " send to " + address).to(ORDER_JOIN_STREAM);

        KafkaStreams streams;
        streams = new KafkaStreams(builder.build(), props);
        streams.start();

    }

ksqlDB

1) order 스트림 생성

  • 주문 데이터를 받을 주문 (order) 스트림을 생성한다.
CREATE STREAM order (user VARCHAR, product VARCHAR)
  WITH (kafka_topic='order', value_format='json', partitions=1);
  
 Message        
----------------
 Stream created 
----------------

2) address 테이블 생성 및 user-address 매핑 데이터 준비

ksql> CREATE TABLE address (user VARCHAR PRIMARY KEY, address VARCHAR)
  WITH (kafka_topic='address', value_format='json', partitions=1);
  
 Message       
---------------
 Table created 
---------------
ksql> INSERT INTO address (user, address) VALUES ('Jimin', 'Seoul');
ksql> INSERT INTO address (user, address) VALUES ('John', 'San Francisco');
ksql> SELECT * FROM address EMIT CHANGES;
+------------------------------------------------------------------+------------------------------------------------------------------+
|USER                                                              |ADDRESS                                                           |
+------------------------------------------------------------------+------------------------------------------------------------------+
|Jimin                                                             |Seoul                                                             |
|John                                                              |San Francisco                                                     |
 
Press CTRL-C to interrupt

3) order 스트림과 address 테이블 조인

ksql> CREATE STREAM orderJoin AS
  SELECT 
     order.user AS user, 
     product,
     address
  FROM order
    LEFT JOIN address ON order.user = address.user
  EMIT CHANGES;

 Message                                 
-----------------------------------------
 Created query with ID CSAS_ORDERJOIN_15 
-----------------------------------------

4) orderJoin 스트림을 Push Query로 데이터 확인 & order 데이터 생성

  • orderJoin 스트림의 데이터를 SELECT 한다.
ksql> SELECT * from orderJoin EMIT CHANGES;
+-------------------------------------------+-------------------------------------------+-------------------------------------------+
|USER                                       |PRODUCT                                    |ADDRESS                                    |
+-------------------------------------------+-------------------------------------------+-------------------------------------------+
Press CTRL-C to interrupt
  • 그 상태에서 order 스트림에 2개의 주문 정보 (user, product)를 넣어준다.
ksql> INSERT INTO order (user, product) VALUES ('Jimin', 'Mac book');
ksql> INSERT INTO order (user, product) VALUES ('John', 'iPad');
  • 그러면 orderJoin 스트림을 SELECT한 쪽의 OUTPUT에 user의 address 정보가 매핑되어 데이터가 들어온 것을 확인할 수 있다.
ksql> SELECT * from orderJoin EMIT CHANGES;
+-------------------------------------------+-------------------------------------------+-------------------------------------------+
|USER                                       |PRODUCT                                    |ADDRESS                                    |
+-------------------------------------------+-------------------------------------------+-------------------------------------------+
|Jimin                                      |Mac book                                   |Seoul                                      |
|John                                       |iPad                                       |San Francisco                              |

Press CTRL-C to interrupt

Python에서 ksqlDB 사용하기

공식 문서: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/
Python 라이브러리: https://github.com/bryanyang0528/ksql-python
예제: https://velog.io/@jm94318/ksqlDB-실습

공식 문서에서 소개한 ksql 라이브러리를 사용하면 다음과 같이 ksqlDB를 사용할 수 있다.

import logging

from ksql import KSQLAPI


logging.basicConfig(level=logging.DEBUG)
client = KSQLAPI('http://localhost:8088', timeout=None)


result = client.ksql('show tables')
print(result)

results = client.query("""SELECT * FROM riderLocations
  WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;""")
for result in results:
    print(result)
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:8088
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /info HTTP/1.1" 200 133
DEBUG:root:KSQL generated: SELECT * FROM riderLocations
  WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
[{"header":{"queryId":"transient_RIDERLOCATIONS_71790220042559025","schema":"`PROFILEID` STRING, `LATITUDE` DOUBLE, `LONGITUDE` DOUBLE"}},

{"row":{"columns":["4ab5cbad",37.3952,-122.0813]}},

{"row":{"columns":["8b6eae59",37.3944,-122.0813]}},

{"row":{"columns":["4a7c7b41",37.4049,-122.0822]}},
import logging

from ksql import KSQLAPI


logging.basicConfig(level=logging.DEBUG)
client = KSQLAPI('http://localhost:8088')

client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);")
client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);")
client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);")
client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);")
client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);")
client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);")
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:8088
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /info HTTP/1.1" 200 133
DEBUG:root:KSQL generated: INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
DEBUG:root:KSQL generated: INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
DEBUG:root:KSQL generated: INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
DEBUG:root:KSQL generated: INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
DEBUG:root:KSQL generated: INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
DEBUG:root:KSQL generated: INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);

이것이 가능한 이유는...

ksqlDB는 클라이언트가 ksqlDB 엔진(SQL문과 쿼리문을 처리)에 접근할 수 있게 REST API 제공하기 때문이다. 파이썬 ksql 라이브러리는 내부적으로 REST API를 호출하여 SQL문을 처리하고 있다. 따라서 이 REST API를 사용하면 어떤 언어를 사용하더라도 스트림즈를 구현할 수 있다.

다음글에서는 ksqlDB의 개념과 구조, Kafka Streams와의 비교 등 좀 더 ksqlDB의 이론과 심화 내용을 다뤄보도록 할 예정이다.

profile
개발하는 사람입니다.

0개의 댓글