mysql:
image: debezium/example-mysql:1.6
container_name: mysql
volumes:
- ./mysql/data:/var/lib/mysql
ports:
- "3306:3306"
env_file:
- .env
.env
파일MYSQL_ROOT_PASSWORD="debezium"
MYSQL_USER="admin"
MYSQL_PASSWORD="admin123"
여기서는 debezium이 데이터에 동기적으로 액세스할 수 있도록 이미 CDC 설정이 있는 debezium/example-mysql:1.6이라는 이름의 도커 이미지를 사용한다.
많은 명령을 입력하는 데 시간을 낭비하고 싶지 않기 때문에 시스템과 상호 작용할 수 있는 명령의 바로 가기가 포함된 Makefile 파일을 사용했습니다.
include .env
build:
docker-compose build
up:
docker-compose --env-file .env up -d
down:
docker-compose --env-file .env down
ps:
docker ps --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}"
to_redpanda:
open http://localhost:8080/topics
to_minio:
open http://localhost:9001/buckets
to_mysql:
docker exec -it mysql mysql -u"root" -p"${MYSQL_ROOT_PASSWORD}" ${MYSQL_DATABASE}
to_data_generator:
docker exec -it data_generator /bin/bash
make up
# checking redpanda console
make to_redpanda
# checking minio
make to_minio
주문 거래 데이터를 저장하기 위해 olistorders_dataset 테이블을 사용하여 브라질전자상거래 데이터베이스를 생성한다.
make up
명령어를 실행하여 MySQL 서비스를 빌드하고 make
to_mysql
명령어를 실행하여 MySQL에 접속한 후 아래와 같이 명령어를 실행src
├── 01_generate_orders.py
├── 02_generate_clickstream.py
├── data
│ └── olist_orders_dataset.csv
├── Dockerfile
├── requirements.txt
└── setup_connectors.sh
data-generator:
build:
context: ./src
dockerfile: ./Dockerfile
container_name: data_generator
volumes:
- ./src:/opt/src
env_file:
- .env
다.
make build
make down
make up
make to_data_generator
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python src/01_generate_orders.py
💡 ERROR:
패키지가 시스템 전체 Python에 이미 설치되어 있으면 가상 환경에 다시 설치되지 않고 요구 사항이 이미 충족되었음을 알리는 메시지가 표시됩니다.
이를 해결하기 위해 pip install
명령을 실행할 때 --ignore-installed
플래그를 사용하였습니다. 이 플래그는 시스템 전체 Python에 이미 패키지가 설치되어 있어도 pip가 패키지를 다시 설치하도록 강제합니다.
01_generated_orders.py
파일의 MySQL HOST 정보를 수정한다.(74254, 9)
NO. DATES: 366
Writing data on: 2017-08-01
-Records: 165
Writing data on: 2017-08-02
-Records: 157
Writing data on: 2017-08-03
-Records: 148
redpanda:
image: vectorized/redpanda
container_name: redpanda
ports:
- "9092:9092"
- "29092:29092"
command:
- redpanda
- start
- --overprovisioned
- --smp
- "1"
- --memory
- "1G"
- --reserve-memory
- "0M"
- --node-id
- "0"
- --kafka-addr
- PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr
- PLAINTEXT://redpanda:29092,OUTSIDE://redpanda:9092
- --check=false
redpanda-console:
image: vectorized/console
container_name: redpanda_console
depends_on:
- redpanda
ports:
- "8080:8080"
env_file:
- .env
io.confluent.connect.s3.S3SinkConnector
가 없으므로, Kafka 연결용 커넥터를 다운로드하려면 Dockerfile로 kafka/ 폴더를 만들어야 한다.FROM debezium/connect
RUN curl -O https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connects3/versions/10.3.1/confluentinc-kafka-connect-s3-10.3.1.zip \
&& unzip confluentinc-kafka-connect-s3-10.3.1.zip \
&& mv confluentinc-kafka-connect-s3-10.3.1 /kafka/connect/ \
&& rm confluentinc-kafka-connect-s3-10.3.1.zip
We add docker-compose.yml with declarations like below
kafka-connect:
build:
context: ./kafka
dockerfile: ./Dockerfile
container_name: kafka_connect
depends_on:
- redpanda
ports:
- "8083:8083"
env_file:
- .env
{'class': 'io.confluent.connect.s3.S3SinkConnector', 'type': 'sink', 'version': '10.3.1'},
{'class': 'io.confluent.connect.storage.tools.SchemaSourceConnector', 'type': 'source', 'version': '3.4.0'},
{'class': 'io.debezium.connector.db2.Db2Connector', 'type': 'source', 'version': '2.2.0.Alpha3'},
{'class': 'io.debezium.connector.mongodb.MongoDbConnector', 'type': 'source', 'version': '2.2.0.Alpha3'},
{'class': 'io.debezium.connector.mysql.MySqlConnector', 'type': 'source', 'version': '2.2.0.Alpha3'},
{'class': 'io.debezium.connector.oracle.OracleConnector', 'type': 'source', 'version': '2.2.0.Alpha3'},
{'class': 'io.debezium.connector.postgresql.PostgresConnector', 'type': 'source', 'version': '2.2.0.Alpha3'},
{'class': 'io.debezium.connector.spanner.SpannerConnector', 'type': 'source', 'version': '2.2.0.Alpha3'},
{'class': 'io.debezium.connector.sqlserver.SqlServerConnector', 'type': 'source', 'version': '2.2.0.Alpha3'},
{'class': 'io.debezium.connector.vitess.VitessConnector', 'type': 'source', 'version': '2.2.0.Alpha3'},
{'class': 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector', 'type': 'source', 'version': '3.4.0'},
{'class': 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', 'type': 'source', 'version': '3.4.0'},
{'class': 'org.apache.kafka.connect.mirror.MirrorSourceConnector', 'type': 'source', 'version': '3.4.0'}
io.debezium.connector.mysql.MySqlConnector
입니다.curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "src-brazillian-ecommerce",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.include.list": "brazillian_ecommerce",
"topic.prefix": "dbserver1",
"schema.history.internal.kafka.bootstrap.servers": "redpanda:9092",
"schema.history.internal.kafka.topic": "schema-changes.brazillian_ecommerce"
}
}'
요청이 성공하면 Redpanda 콘솔로 돌아가면 디비지움 관련 토픽 이 자동으로 생성된 것을 확인할 수 있다.
dbserver1.brazilian_ecommerce.olist_orders_dataset
항목을 확인하면 CDC 메시지의 전체 내용을 볼 수 있다.
minio:
hostname: minio
image: "minio/minio"
container_name: minio
ports:
- "9001:9001"
- "9000:9000"
command: [ "server", "/data", "--console-address", ":9001" ]
volumes:
- ./minio/data:/data
env_file:
- .env
mc:
image: minio/mc
container_name: mc
hostname: mc
environment:
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=minio123
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; exit 0; "
depends_on:
- minio
.env
file# MinIO
MINIO_ROOT_USER="minio"
MINIO_ROOT_PASSWORD="minio123"
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio123"
io.confluent.connect.s3.S3SinkConnector
, you can find here .curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "sink-s3-brazillian-ecommerce",
"config": {
"topics.regex": "dbserver1.brazillian_ecommerce.*",
"topics.dir": "brazillian_ecommerce",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": "1000",
"store.url": "http://minio:9000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.region": "us-east-1",
"s3.bucket.name": "warehouse",
"aws.access.key.id": "minio",
"aws.secret.access.key": "minio123"
}
}'
We do the same by accessing the data_generator container via make to_data_generator. Then use python to run the script 02_generate_clickstream.py when successful we will see the log as below
At this point, when checking with the Redpanda console, we will see that a topic has clickstream_events been created, along with events captured on the system.
Finally, to sink data about MinIO, register a connector on Kafka connect by requesting as below
curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "sink-s3-clickstream",
"config": {
"topics": "clickstream_events",
"topics.dir": "clickstream_events",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"s3.compression.type": "gzip",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": "100",
"store.url": "http://minio:9000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.region": "us-east-1",
"s3.bucket.name": "warehouse",
"aws.access.key.id": "minio",
"aws.secret.access.key": "minio123"
}
}'
Yes, finally clickstream data has been synced to MinIO
한 시스템에서 다른 시스템으로 데이터를 마이그레이션하는 것은 시간이 많이 걸리고 일괄 처리 수집 방향으로 이동하면 원래 시스템에 정체가 발생합니다. 새 데이터를 얻기 위해 데이터베이스에 대한 요청이 쇄도해야 하기 때문입니다. 실제로 1-2개의 테이블이 중요하지 않은 경우 보고서 분석을 수행하거나 기계 학습 모델을 제공할 수 있으려면 더 많은 테이블이 필요합니다.
스트림 데이터 수집은 CDC를 통해 추적된 변경 사항 덕분에 Steam에서만 로그 데이터를 읽기 때문에 소스 시스템의 부하를 줄이는 것과 같은 더 많은 이점이 있으며, 이 신뢰할 수 있는 소스에 전적으로 의존하여 다른 많은 시스템의 모든 데이터를 재현하고 재생성할 수 있습니다.
데이터 마이그레이션 프로세스는 Data Engineer의 모든 데이터 소비자에게 데이터를 제공하는 프로세스의 첫 번째 단계일 뿐입니다. 남은 작업은 사용자가 마지막에 전체 데이터를 볼 수 있도록 CDC에서 어떻게 재구성할 수 있는지입니다. 다음 기사에서는 이 CDC 데이터를 읽는 데 일반적으로 사용되는 두 가지 주요 알고리즘인 쓰기 시 복사 및 읽기 시 병합 .