[CDC] (2) Kafka -> JDBC connector -> SinkDB

masibasi·2023년 5월 16일
1

2. Kafka → JDBC connector → SinkDB

A. SinkDB 컨테이너 생성 (docker-compose.yml 작성)

mysql-sink:
    image: mysql:8.0
    container_name: mysql-sink
    ports:
      - 3307:3306
    environment:
      MYSQL_ROOT_PASSWORD: sasd
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw 
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_unicode_ci
    volumes:
      - /Users/jiminlee/TempCodes/CDC/mysql-sink/data:/var/lib/mysql

docker-compose -f docker-compose.yml up -d

docker-compose파일을 수정하여 쉽게 mysql sink용 컨테이너를 생성해 주었다.

B. DB 설정

데이터베이스 및 테스트용 테이블 생성

mysql -u root -p

create database sinkdb;

use sinkdb;

CREATE TABLE accounts (
   account_id VARCHAR(255),
   role_id VARCHAR(255),
   user_name VARCHAR(255),
   user_description VARCHAR(255),
   update_date DATETIME DEFAULT CURRENT_TIMESTAMP,
   PRIMARY KEY (account_id)
);

docker exec -it mysql-sink bin/bash

mysql 사용자 추가 및 권한 확인

use mysql;

// mysqluser 가 추가 되어 있는지 확인
select host, user from user;

// mysqluser 없으면 생성
CREATE USER 'mysqluser'@'%' IDENTIFIED BY 'mysqlpw';
// mysqluser 에게 권한 부여
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';

FLUSH PRIVILEGES;

C. 4. Kafka JDBC Connector (Source and Sink) 설치

JDBC Connector 설치

https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

위 링크에서 다운로드후, Kafka 컨테이너로 업로드

#파일 업로드
docker cp confluentinc-kafka-connect-jdbc-10.7.0.zip kafka:/opt/kafka_2.13-2.8.1/connectors/
cd /opt/kafka_2.13-2.8.1/connectors
unzip confluentinc-kafka-connect-jdbc-10.7.0.zip

plugin 경로 확인

source connector를 설치할 때 이미 /opt/kafka/config/connect-distributed.properties파일의 plugin 경로를 수정해두었다.

D. Kafka connect 실행

connect-distributed.sh /opt/kafka/config/connect-distributed.properties

E. Sink Connector 생성하기

worker, version, commit 및 Kafka 클러스터 ID에 대한 kafka Connect 클러스터 정보를 확인해보자.

curl http://localhost:8083/

MySQL 커넥터 플러그인 확인

curl --location --request GET 'localhost:8083/connector-plugins'


io.confluent.connect.jdbc.JdbcSinkConnector, io.confluent.connect.jdbc.JdbcSourceConnector 가 있어야 한다.

Rest API 로 connector 생성

curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
  "name": "sink-test-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://mysql-sink:3306/sinkdb?user=mysqluser&password=mysqlpw",
    "auto.create": "false",
    "auto.evolve": "false",
    "delete.enabled": "true",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "table.name.format":"${topic}",
    "tombstones.on.delete": "true",
    "connection.user": "mysqluser",
    "connection.password": "mysqlpw",
    "topics.regex": "dbserver1.testdb.(.*)",
//    "topics": "dbserver1.testdb.accounts", <- 하나만 명시
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "transforms": "unwrap, route, TimestampConverter",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
//  "transforms.unwrap.drop.tombstones": "false",
// "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
    "transforms.TimestampConverter.target.type": "Timestamp",
    "transforms.TimestampConverter.field": "update_date"
  }
}'

상세 Configuration 확인

https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html#connection

Kafka Connect REST API

# 목록
curl --location --request GET 'http://localhost:8083/connectors'

# 상세정보
curl --location --request GET 'http://localhost:8083/connectors/sink-test-connector/config ' \
--header 'Content-Type: application/json'

# 삭제 
curl --location --request DELETE 'http://localhost:8083/connectors/sink-test-connector'

F. MySQL Sink 확인


이미 카프카 상에 생성되어있던 토픽으로 sinkDB가 업데이트 되어있는 것을 확인할 수 있다.

ADD

INSERT INTO accounts VALUES ("111111", "111", "Jimin", "ADD", "2021-08-16 10:11:12");
INSERT INTO accounts VALUES ("222222", "222", "Lee", "FROM", "2021-08-16 11:12:13");
INSERT INTO accounts VALUES ("333333", "333", "Test", "SOURCE", "2021-08-16 12:13:14");

UPDATE

UPDATE accounts SET user_name = 'UPDATE!!!' WHERE account_id = 111111

DELETE ← Trouble Shooting #5

delete from accounts where account_id = 0;

2.1 TargetDB2 : SinkDB2 및 custom table 생성

A. MYSQL 테이블 생성 및 권한설정

mysql -u root -p

create database testdb;

use testdb;

CREATE TABLE CUSTOM_TABLE (
   title VARCHAR(255),
   name VARCHAR(255),
	id VARCHAR(255),
   PRIMARY KEY (id)
);
mysql -u root -p

// 권한설정 
use mysql;

// mysqluser 가 추가 되어 있는지 확인
select host, user from user;

// mysqluser 없으면 생성
CREATE USER 'mysqluser'@'%' IDENTIFIED BY 'mysqlpw';
// mysqluser 에게 권한 부여
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';

FLUSH PRIVILEGES;

//데이터 베이스 테이블  생성
create database sinkdb2;

use sinkdb2;

CREATE TABLE CUSTOM_TABLE (
   title VARCHAR(255),
   name VARCHAR(255),
	id VARCHAR(255),
   PRIMARY KEY (id)
);

**B.** SinkDB2 Connector 생성

중요한 것은 topics에서 CUSTOM_TABLE만 받아오도록 설정하는 것.

curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
  "name": "sink-test2-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://mysql-sink2:3306/sinkdb2?user=mysqluser&password=mysqlpw",
    "auto.create": "false",
    **"auto.evolve": "true",**
    "delete.enabled": "true",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "table.name.format":"${topic}",
    "tombstones.on.delete": "true",
    "connection.user": "mysqluser",
    "connection.password": "mysqlpw",
    **"topics": "dbserver1.testdb.CUSTOM_TABLE",**
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "transforms": "unwrap, route, TimestampConverter",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
    "transforms.TimestampConverter.target.type": "Timestamp",
    "transforms.TimestampConverter.field": "update_date"
  }
}'

C. Insert 수행, Kafka Consumer로 Topic 저장 확인


0개의 댓글