SingleStoreDB Pipeline 을 이용한 Multi Table Insert

Jongsoo Noh·2023년 3월 4일
0
post-thumbnail

개요

SingleStoreDB Pipeline 은 다양한 데이터 소스에서 다양한 포맷의 원천 데이터를 빠른 속도로 Ingest 할 수 있도록 하는 기능입니다.

이번 글에서는 S3 Bucket 에 있는 CSV 파일을 Pipeline 을 통해 로딩할 때 특정 컬럼의 값에 따라 여러 테이블로 분기하여 Insert 하는 예제를 살펴 보려고 합니다.

다음과 같은 세가지 정도의 방법으로 위의 요구사항을 구현할 수 있습니다.

  • 하나의 pipeline 생성, where 문으로 컬럼 값에 따라 다른 Table 에 Insert
  • 하나의 pipeline 생성, pipeline 에서 procedure 로 데이터 전달, procedure 에서 3개의 INSERT SELECT 구문 실행
  • 하나의 pipeline 생성, 1개의 Table 에 Insert, 컬럼값에 따라 3개의 view 를 만들어 활용

다음은 4 vCPU, 8 GB 메모리 환경에서 S3와 호환되는 Oracle Cloud 에서 각각을 테스트한 결과입니다. 장비의 크기가 작으므로 소요 시간은 참고만 하시기 바랍니다.

1) 3개의 PIPELINE 생성

# 테이블 각각 생성 : Size 컬럼 값에 따라 구분

create table tb_small (
  id   int,
  size varchar(10),
  age  int,
  team varchar(10),
  win  varchar(10),
  dt   date,
  prob double,
  primary key(id)
);

singlestore> create table tb_medium like tb_small;
Query OK, 0 rows affected (0.21 sec)

singlestore> create table tb_big like tb_small;
Query OK, 0 rows affected (0.21 sec)

# Pipeline 각각 생성

CREATE PIPELINE p1 AS LOAD DATA S3 "s3/test.csv.gz"
CONFIG '{"endpoint_url":"https://objectstorage.ap-seoul-1.oraclecloud.com/"}'
SKIP DUPLICATE KEY ERRORS
INTO TABLE tb_small IGNORE 1 LINES
FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
WHERE size = 'small';

CREATE PIPELINE p2 AS LOAD DATA S3 "s3/test.csv.gz"
CONFIG '{"endpoint_url":"https://objectstorage.ap-seoul-1.oraclecloud.com/"}'
SKIP DUPLICATE KEY ERRORS
INTO TABLE tb_medium IGNORE 1 LINES
FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
WHERE size = 'medium';

CREATE PIPELINE p3 AS LOAD DATA S3 "s3/test.csv.gz"
CONFIG '{"endpoint_url":"https://objectstorage.ap-seoul-1.oraclecloud.com/"}'
SKIP DUPLICATE KEY ERRORS
INTO TABLE tb_big IGNORE 1 LINES
FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
WHERE size = 'big';

# Pipeline 각각 실행 : 총 로딩 건수 : 500만건

singlestore> start pipeline p1 foreground;
Query OK, 1667463 rows affected (11.10 sec)

singlestore> start pipeline p2 foreground;
Query OK, 1666384 rows affected (10.18 sec)

singlestore> start pipeline p3 foreground;
Query OK, 1666153 rows affected (10.40 sec)

2) PIPELINE ... INTO PROCEDURE 활용

# Procedure 생성 : Pipeline 으로 row 전달 받은 후 컬럼 값에 따라 각각 Insert 문 실행

DELIMITER //
CREATE OR REPLACE PROCEDURE myproc(batch QUERY(
  id int,
  size varchar(10),
  age  int,
  team varchar(10),
  win  varchar(10),
  dt   date,
  prob double))
AS
BEGIN
  INSERT INTO tb_small  SELECT * FROM batch where size = 'small';
  INSERT INTO tb_medium SELECT * FROM batch where size = 'medium';
  INSERT INTO tb_big    SELECT * FROM batch where size = 'big';
END //
DELIMITER ;

# Pipeline 생성 : Procedure 명시

CREATE PIPELINE one_pipe AS LOAD DATA S3 "s3/test.csv.gz"
CONFIG '{"endpoint_url":"https://objectstorage.ap-seoul-1.oraclecloud.com/"}'
INTO PROCEDURE myproc IGNORE 1 LINES
FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n';

# Pipeline 실행

singlestore> start pipeline one_pipe foreground;
Query OK, 5000000 rows affected (25.65 sec)

# 로딩된 테이블 조회

singlestore> select count(*) from tb_small;
+----------+
| count(*) |
+----------+
|  1667463 |
+----------+
1 row in set (0.02 sec)

singlestore> select count(*) from tb_medium;
+----------+
| count(*) |
+----------+
|  1666384 |
+----------+
1 row in set (0.03 sec)

singlestore> select count(*) from tb_big;
+----------+
| count(*) |
+----------+
|  1666153 |
+----------+
1 row in set (0.02 sec)

3) VIEW 활용

# Pipeline 으로 로딩할 테이블 생성

create table tb_all (
  id   int,
  size varchar(10),
  age  int,
  team varchar(10),
  win  varchar(10),
  dt   date,
  prob double,
  primary key(id),
  sort key(size, id)
);

# view 생성

create view v_small  as select * from tb_all where size = 'small';
create view v_medium as select * from tb_all where size = 'medium';
create view v_big    as select * from tb_all where size = 'big';

# Pipeline 생성

CREATE PIPELINE pipe AS LOAD DATA S3 "s3/test.csv.gz"
CONFIG '{"endpoint_url":"https://objectstorage.ap-seoul-1.oraclecloud.com/"}'
SKIP DUPLICATE KEY ERRORS
INTO TABLE tb_all IGNORE 1 LINES
FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n';

# Pipeline 실행

singlestore> start pipeline pipe foreground;
Query OK, 5000000 rows affected (17.54 sec)

# View 를 이용한 조회

singlestore> select 'small' as size, count(*) from v_small
    -> union all
    -> select 'medium' , count(*) from v_medium
    -> union all
    -> select 'big', count(*) from v_big;
+--------+----------+
| size   | count(*) |
+--------+----------+
| small  |  1667463 |
| medium |  1666384 |
| big    |  1666153 |
+--------+----------+
3 rows in set (0.78 sec)

마무리

SingleStoreDB 의 pipeline 및 컬럼스토어의 장점으로 인해 하나의 데이터 소스에서 여러 테이블로 각자의 데이터를 저장하고 조회에 활용할 수 있는 다양한 방법을 구현할 수 있으며 로딩 속도 및 조회 속도 또한 빠르게 처리할 수 있습니다.

profile
Database Guy

0개의 댓글