ksqlDB 실습

이지민·2023년 5월 3일
0

Quick Start

시나리오

  • Mountain View 주변의 라이더 찾기
  • Mountain View 위치 (lat, long): (37.4133, -122.1162)
  • 주변 기준 : 5마일 / 10마일

Stream: riderLocations 생성

라이더의 위치에 대한 스트림

ksql> CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
  WITH (kafka_topic='locations', value_format='json', partitions=1);
 
 
 Message       
----------------
 Stream created
----------------
keyvalue
profileId라이더 ID
latitude위도
longitude경도

parameterdescription
kafka_topic스트림을 흘려줄 topic의 이름
토픽이 없으면 생성하고 이미 있으면 해당 토픽에 스트림 생성
value_formatjson
{"profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205}
partitions토픽 생성시 지정한 파티션 수
이미 토픽이 생성돼 있다면 필요 없는 파라미터

Table: currentLocation 생성

라이더의 현재 위치에 대한 구체화 뷰 (materialized view)

ksql> CREATE TABLE currentLocation AS
  SELECT profileId,
         LATEST_BY_OFFSET(latitude) AS la,
         LATEST_BY_OFFSET(longitude) AS lo
  FROM riderlocations
  GROUP BY profileId
  EMIT CHANGES;
 
 Message                                     
----------------------------------------------
 Created query with ID CTAS_CURRENTLOCATION_3
-------------------------T---------------------
 
...
라이더 스트림 정보 입력
...
 
 
ksql> SELECT * from currentLocation;
>
+--------------------------------+--------------------------------+--------------------------------+
|PROFILEID                       |LA                              |LO                              |
+--------------------------------+--------------------------------+--------------------------------+
|18f4ea86                        |37.3903                         |-122.0643                       |
|4a7c7b41                        |37.4049                         |-122.0822                       |
|4ab5cbad                        |37.3952                         |-122.0813                       |
|4ddad000                        |37.7857                         |-122.4011                       |
|8b6eae59                        |37.3944                         |-122.0813                       |
|c2309eec                        |37.7877                         |-122.4205                       |
Query terminated

Table: riderNearMountainView

얼마나 많은 라이더가 MountainView에서 얼마나 떨어져 있는지 보여주는 구체화 뷰 (materialized view)

ksql> CREATE TABLE ridersNearMountainView AS
>  SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
>         COLLECT_LIST(profileId) AS riders,
>         COUNT(*) AS count
>  FROM currentLocation
>  GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
 
 Message                                            
-----------------------------------------------------
 Created query with ID CTAS_RIDERSNEARMOUNTAINVIEW_5
-----------------------------------------------------

Push Query: riderLocations

Mountain View에서 5마일 이내에 들어온 라이더의 위치의 스트림 정보를 가져옴
Push query: 실시간으로 output이 나옴

아래와 같이 select 문 실행 후 insert하면 Mountain View와 5마일 이내에 라이더가 들어오면 output row가 추가된다.

ksql> SELECT * FROM riderLocations
>  WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
+--------------------------------+--------------------------------+--------------------------------+
|PROFILEID                       |LATITUDE                        |LONGITUDE                       |
+--------------------------------+--------------------------------+--------------------------------+
|4ab5cbad                        |37.3952                         |-122.0813                       |
|8b6eae59                        |37.3944                         |-122.0813                       |
|4a7c7b41                        |37.4049                         |-122.0822                       |
 
Press CTRL-C to interrupt
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
>INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
>INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
>INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
>INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
>INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);

Pull Query: riderNearMountainView

현재 Mountain View에서 10마일 이내 라이더의 위치의 스트림 정보를 가져옴
Pull query: 기존의 SQL과 비슷 (traditional request-response model)

ksql> SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;
+--------------------------------+--------------------------------+--------------------------------+
|DISTANCEINMILES                 |RIDERS                          |COUNT                           |
+--------------------------------+--------------------------------+--------------------------------+
|0.0                             |[4ab5cbad, 8b6eae59, 4a7c7b41]  |3                               |
|10.0                            |[18f4ea86]                      |1                               |
Query terminated
profile
개발하는 사람입니다.

0개의 댓글