[CDC] 1. SourceDB → Connector → Kafka

masibasi·2023년 5월 16일
1

목차

1. SourceDB → Connector → Kafka

2. Kafka → JDBC connector → SinkDB1

2-1. Kafka → JDBC connector → SinkDB2

3. Spring → MySQL 연동

4. React → Spring 연동

5. TroubleShooting

0. 진행 환경

  • Macbook Air (M1) Docker

1. sourceDB → connector → Kafka

A. docker-compose.yml 작성 및 m1 기반 컨테이너 설치

version: '3'
services:
  mysql:
    image: mysql:8.0
    container_name: mysql
    ports:
      - 3306:3306
    environment:
      **MYSQL_ROOT_PASSWORD: sasd**
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw 
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_unicode_ci
    **volumes:
      - /Users/jiminlee/TempCodes/CDC/mysql/data:/var/lib/mysql**

  zookeeper:
		**platform: linux/amd64/v8 //platform: linux/amd64/**
    container_name: zookeeper
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    container_name: kafka
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

docker-compose 이해

  • version: docker-compose 버젼을 지정한다. 여기서는 2 라고 기술했다.
  • services: docker-compose의 경우 docker 컨테이너로 수행될 서비스들은 services 하위에 기술한다.
  • hostname : container에 이름을 지정해주는 것이라고 생각할 수 있다.
  • mysql: 서비스 이름을 mysql 로 작성했다. service 하위에 작성하면 서비스 이름으로 동작한다.
  • image: 사용하는 서비스의 버전을 명시할 수 있는 부분이다.
    • 참고로 실전에서 사용하려면 latest 라는 태그를 사용하지 말고, 정확히 원하는 버젼을 기술해서 사용하길 추천한다.
    • latest라고 태그를 지정하면, 매번 컨테이너를 실행할때마다 최신버젼을 다운받아 실행하므로 변경된 버젼으로 인해 원하지 않는 결과를 볼 수 있다. (주의 !!!)
  • volume : 마운트할 볼륨을 지정한다. 로컬에 있는 directory를 container 안에 마운트 한다고 생각하면 된다.
  • ****platform :** 이미지가 어떤 플랫폼 대상인지 지정할 수 있다.
  • environment: 환경 변수를 설정할 수 있다. environment 하위에 필요한 환경을 작성하자.
    • mysqlpassword : DB root 계정의 비밀번호를 설정한다.
  • ports: kafka 브로커의 포트를 의미한다. 외부포트:컨테이너내부포트 형식으로 지정한다.
  • volumes: 컨테이너와 호스트 서버 디렉토리 연결. 여러개 설정 가능. : 기준 왼쪽이 호스트 서버, 오른쪽이 컨테이너 서버
  • kafka: kafka 브로커 이름을 지정한다.
  • depends_on: docker-compose 에서는 서비스들의 우선순위를 지정해 주기 위해서 depends_on 을 이용한다.
    • zookeeper 라고 지정하였으므로, kafka는 zookeeper이 먼저 실행되어 있어야 컨테이너가 올라오게 된다.
  • KAFKA_ZOOKEEPER_CONNECT: kafka가 zookeeper에 커넥션하기 위한 대상을 지정한다. 여기서는 zookeeper(서비스이름):2181(컨테이너내부포트) 로 대상을 지정했다.

docker-compose 파일 실행, 컨테이너 실행

docker-compose -f docker-compose.yml up -d


진행하다 위와 같이 에러가 나서 compose 파일을 수정해줬다 - #1 확인
설치된 컨테이너 확인 : docker ps -a

B. DB 설정

데이터베이스 및 테스트용 테이블 생성

mysql -u root -p

create database testdb;

use testdb;

CREATE TABLE accounts (
   account_id VARCHAR(255),
   role_id VARCHAR(255),
   user_name VARCHAR(255),
   user_description VARCHAR(255),
   update_date DATETIME DEFAULT CURRENT_TIMESTAMP,
   PRIMARY KEY (account_id)
);

MySQL Container 접속 후 - docker exec -it mysql /bin/bash

  • -it는 표준입출력을 열고 tty를 통해 접속하겠다는 의미
  • 컨테이너명 뒤에는 접속할 때 어떤쉘을 사용할지 지정할 수 있다. bash 가 표준이기에 bash 를 사용

SQL문으로 TABLE을 생성해준다.

mysql 사용자 추가 및 권한 확인

mysqluser에게 모든 권한을 부여하는 과정이다.

use mysql;

// mysqluser 가 추가 되어 있는지 확인
select host, user from user;

// mysqluser 없으면 생성
CREATE USER 'mysqluser'@'%' IDENTIFIED BY 'mysqlpw';
// mysqluser 에게 권한 부여
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';

FLUSH PRIVILEGES;

C. Debezium Connector for MySQL 플러그인 설치

Debezium Connector 설치

https://debezium.io/releases/1.5/debezium-connector-mysql-1.5.4.Final-plugin.tar.gz을 다운로드 받는다.

/opt/kafka_2.13-2.8.1/ 경로에 미리 connectors폴더를 만들자.

로컬 컴퓨터에서 kafka 컨테이너로 debezium-connector-mysql-1.5.4.Final-plugin.tar.gz를 업로드한다. 

docker cp debezium-connector-mysql-1.5.4.Final-plugin.tar.gz kafka:/opt/kafka_2.13-2.8.1/connectors/debezium-connector-mysql-2.1.4.Final-plugin.tar.gz

파일 압축을 푼다

cd /opt/kafka_2.13-2.8.1/connectors
tar -zxvf debezium-connector-mysql-1.5.4.Final-plugin.tar.gz

plugin 경로 수정

카프카 컨테이너에 접속하여 /opt/kafka/config/connect-distributed.properties 파일을 수정한다. 파일을 수정한 뒤에는 카프카 컨테이너를 재시작해야 플러그인 경로가 정상 반영된다.

// 원래 경로
#plugin.path=

// 수정 경로
plugin.path=/opt/kafka_2.13-2.8.1/connectors

D. kafka connect 실행

Distributed Mode로 kafka connect 실행

분산모드(distributed) 카프카 커넥트를 실행한다. 분산모드는 2개 이상의 커넥트를 한 개의 클러스터를 묶어서 운영한다.

connect-distributed.sh /opt/kafka/config/connect-distributed.properties

// 정상 실행 시 INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57) 확인 가능

E. Source Connector 생성하기

Kafka Connect 클러스터 확인

다른 탭을 새로 열어 카프카에 접속 후 플러그인 목록을 조회한다. io.debezium.connector.mysql.MySqlConnector 가 있어야 한다.

curl --location --request GET 'localhost:8083/connector-plugins'

Rest API 로 connector 생성

rest api 를 호출하여 connector 를 생성하자.

curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
  "name": "source-test-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "mysqluser",
    "database.password": "mysqlpw",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.allowPublicKeyRetrieval": "true",
    "database.include.list": "testdb",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.testdb",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "transforms": "unwrap,addTopicPrefix",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.addTopicPrefix.regex":"(.*)",
    "transforms.addTopicPrefix.replacement":"$1"
  }
}'

1.5.1

curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
  "name": "source-test-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "mysqluser",
    "database.password": "mysqlpw",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.allowPublicKeyRetrieval": "true",
    "database.include.list": "testdb",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.testdb",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "transforms": "unwrap,addTopicPrefix",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.addTopicPrefix.regex":"(.*)",
    "transforms.addTopicPrefix.replacement":"$1",
    "topic.prefix" : "testdb",
    "schema.history.internal.kafka.topic": "schemahistory.fullfillment",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "include.schema.changes": "true"
  }
}'

https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-delete-events 2.1

Connector 설정 이해

이러한 설정 중 일부는 일반적인 설정으로서, 모든 커넥터에 대해 지정해야 합니다. 예를 들면 다음과 같습니다.

  • connector.class는 커넥터의 Java 클래스입니다.
  • tasks.max는 이 커넥터에 대해 생성되어야 할 태스크의 최대 수입니다.

다른 설정은 Debezium MySQL 커넥터에만 해당됩니다.

  • database.hostname은 Aurora 데이터베이스의 작성자 인스턴스 엔드포인트를 포함합니다.
  • database.server.name은 데이터베이스 서비의 논리적 이름입니다. 이 설정은 이름은 Debezium에서 생성한 Kafka 주제의 이름에 사용됩니다.
  • database.include.list는 지정한 서버에서 호스팅하는 데이터베이스의 목록을 포함합니다.
  • database.history.kafka.topic은 데이터베이스 스키마 변경을 추적하기 위해 Debezium에서 내부적으로 사용하는 Kafka 주제입니다.
  • database.history.kafka.bootstrap.servers는 MSK 클러스터의 부트스트랩 서버를 포함합니다.
  • 마지막의 8개 줄(database.history.consumer.* 및 database.history.producer.*)은 데이터베이스 기록 주제를 액세스하기 위한 IAM 인증을 활성화합니다. Amazon MSK Connect – Apache Kafka 클러스터로 데이터 전달 서비스 출시 | Amazon Web Services

connector 목록/상세 정보 확인

# 목록
curl --location --request GET 'http://localhost:8083/connectors'

# 상세정보
curl --location --request GET 'http://localhost:8083/connectors/{connector-name}/config ' \
--header 'Content-Type: application/json'

curl --location --request GET 'http://localhost:8083/connectors/source-test-connector/config ' \
--header 'Content-Type: application/json'

#삭제
curl --location --request DELETE 'http://localhost:8083/connectors/source-test-connector'

https://docs.confluent.io/platform/current/connect/references/restapi.html
https://developer.confluent.io/learn-kafka/kafka-connect/rest-api/

Topic 목록 확인

kafka-topics.sh --list --bootstrap-server localhost:9092

F. 레코드 확인

테스트 데이터 입력

INSERT INTO accounts VALUES ("123456", "111", "Susan Cooper", "God", "2021-08-16 10:11:12");
INSERT INTO accounts VALUES ("123457", "111", "Rick Ford", "mistakes", "2021-08-16 11:12:13");
INSERT INTO accounts VALUES ("123458", "999", "Bradley Fine", "face", "2021-08-16 12:13:14");

콘솔 컨슈머 확인

kafka-console-consumer.sh --topic dbserver1.testdb.accounts --bootstrap-server localhost:9092 --from-beginning

새로운 탭을 열어 카프카에서 데이터를 어떻게 받아들이고 있는지 확인해보자.

위 처럼 MySQL에서 변화가 생기는 것을 Kafka가 topic의 형태로 잘 감지하고 저장하고 있음을 확인 할 수 있다.

(2)에서 계속..

0개의 댓글