Kafka와 데이터를 주고받기 위해 사용하는 Java Library
Zookeeper 및 Kafka 서버 기동
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
Topic 생성
$KAFKA_HOME/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 \ --partitions 1
Topic 목록 확인
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Topic 정보 확인
$KAFKA_HOME/bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
메시지 생산
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-events
메시지 소비
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events \ --from-beginning
Windows에서 기동
$KAFKA_HOME\bin\windows
경로에서 .bat
파일로 저장.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
create database mydb;
use mydb;
create table users(
id int auto_increment primary key,
user_id varchar(20),
pwd varchar(20),
name varchar(20),
created_at datetime default NOW()
);
create table orders (
id int auto_increment primary key,
product_id varchar(20) not null,
qty int default 0,
unit_price int default 0,
total_price int default 0,
user_id varchar(50) not null,
order_id varchar(50) not null,
created_at datetime default NOW()
);
implementation 'org.mariadb.jdbc:mariadb-java-client:2.7.8'
implementation 'mysql:mysql-connector-java:8.0.29'
// cmd 창에서 입력
curl -O http://packages.confluent.io/archive/7.3/confluent-community-7.3.1.tar.gz
tar xvf confluent-community-7.3.1.tar.gz
cd $KAFKA_CONNECT_HOME
https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc 에서 jdbc connector설치
.\etc\kafka\connect-distributed.properties
파일을 실행해 최하단의 plugin.path
수정
plugin.path={BASE_DIR}/confluentinc-kafka-connect-jdbc-10.6.3/lib
// 실행 명령어
cd $KAFKA_CONNECT_HOME
.\bin\windows\connect-distributed.bat . \etc\kafka\connect-distributed.properties
C:\Users\사용자 이름\.gradle\caches\modules-2\files-2.1\org.mariadb.jdbc\mariadb-java-client\2.7.8
경로의 폴더 안에 있는 mariadb-java-client-2.7.8.jar
파일을 찾아서
{BASE_DIR}\confluent-7.3.1\share\java\kafka
경로로 집어 넣는다.
zookeeper, server, connect 모두 실행 후
POST
http://127.0.0.1:8083/connectors
// JSON로 데이터 전송
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"{dbpassword}",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
GET
http://127.0.0.1:8083/connectors/my-source-connect/status
insert into users(user_id, pwd, name) values('user1','test1111','User name');
my_topic_{변경된 table}
에 저장된 것을 알 수 있다.참고) https://blog.naver.com/PostView.nhn?blogId=qjawnswkd&logNo=222334228900