[Spring Cloud] Kafka

jsieon97·2023년 3월 20일
0
설치과정은 생략

Kafka 서버 기동

  • Kafka와 데이터를 주고받기 위해 사용하는 Java Library

  • Zookeeper 및 Kafka 서버 기동

    • $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
    • $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
  • Topic 생성

    • $KAFKA_HOME/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 \ --partitions 1
  • Topic 목록 확인

    • $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
  • Topic 정보 확인

    • $KAFKA_HOME/bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
  • 메시지 생산

    • $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-events
  • 메시지 소비

    • $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events \ --from-beginning

Windows에서 기동

  • 모든 명령어는 $KAFKA_HOME\bin\windows 경로에서 .bat파일로 저장
  • .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

Kafka Connect

  • Kafka Connect를 통해 Data를 import/Export 가능
  • 코드 없이 Configuration으로 데이터를 이동
  • Standalone mode, Distribution mode 지원
    • RESTful API 통해 지원
    • Stream 또는 Batch 형태로 데이터 전송
    • 커스텀 Connector를 통한 다양한 Plugin 제공 (File, S3, Hive, Mysql, etc ...)

Kakfa Connect 이전에 mariaDB를 다운받아 OrderService에 연결

mariaDB에 테이블 생성
create database mydb;

use mydb;

create table users(
    id int auto_increment primary key,
    user_id varchar(20),
    pwd varchar(20),
    name varchar(20),
    created_at datetime default NOW()
);

create table orders (
    id int auto_increment primary key,
    product_id varchar(20) not null,
    qty int default 0,
    unit_price int default 0,
    total_price int default 0,
    user_id varchar(50) not null,
    order_id varchar(50) not null,
    created_at datetime default NOW()
   
);
OrderService에 Dependencies 추가
implementation 'org.mariadb.jdbc:mariadb-java-client:2.7.8'
implementation 'mysql:mysql-connector-java:8.0.29'
h2-console 확인

Kafka Connect 설치

// cmd 창에서 입력

curl -O http://packages.confluent.io/archive/7.3/confluent-community-7.3.1.tar.gz
tar xvf confluent-community-7.3.1.tar.gz
cd  $KAFKA_CONNECT_HOME

https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc 에서 jdbc connector설치

.\etc\kafka\connect-distributed.properties 파일을 실행해 최하단의 plugin.path수정

plugin.path={BASE_DIR}/confluentinc-kafka-connect-jdbc-10.6.3/lib

Kafka Connect 실행

// 실행 명령어

cd  $KAFKA_CONNECT_HOME
.\bin\windows\connect-distributed.bat . \etc\kafka\connect-distributed.properties

log4j 오류 발생 시
https://www.inflearn.com/questions/230919/%EC%95%88%EB%85%95%ED%95%98%EC%84%B8%EC%9A%94-connector-%EC%8B%A4%ED%96%89-%EC%8B%9C-%EC%98%A4%EB%A5%98-%EC%A7%88%EB%AC%B8%EB%93%9C%EB%A6%BD%EB%8B%88%EB%8B%A4
참고

C:\Users\사용자 이름\.gradle\caches\modules-2\files-2.1\org.mariadb.jdbc\mariadb-java-client\2.7.8

경로의 폴더 안에 있는 mariadb-java-client-2.7.8.jar 파일을 찾아서

{BASE_DIR}\confluent-7.3.1\share\java\kafka

경로로 집어 넣는다.

테스트

zookeeper, server, connect 모두 실행 후

  • Connect 생성
    POST http://127.0.0.1:8083/connectors
// JSON로 데이터 전송

{
        "name" : "my-source-connect",
        "config" : {    
            "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url":"jdbc:mysql://localhost:3306/mydb",
            "connection.user":"root",
            "connection.password":"{dbpassword}",
            "mode": "incrementing",
            "incrementing.column.name" : "id",
            "table.whitelist":"users",
            "topic.prefix" : "my_topic_",
            "tasks.max" : "1"
    }
}
  • Connect 체크
    GET http://127.0.0.1:8083/connectors/my-source-connect/status
  • DB에서 INSERT 실행
insert into users(user_id, pwd, name) values('user1','test1111','User name');
  • Connect 체크
    메시지가 topic my_topic_{변경된 table}에 저장된 것을 알 수 있다.

참고) https://blog.naver.com/PostView.nhn?blogId=qjawnswkd&logNo=222334228900

profile
개발자로써 성장하는 방법

0개의 댓글