데이터 동기화를 위한 KAFKA활용(1)

InSeok·2023년 3월 23일
0

Kafka Broker

  • 실행된 Kafka 애플리케이션 서버 개념
  • 3대 이상의 Broker Cluster 구성

Zookeeper

  • 이러한 브로커들을 컨트롤해주는 역할
    • 즉, 클라이언트가 서로 공유하는 데이터를 관리해주는 역할
  • 분산되어 있는 각 애플리케이션의 정보를 중앙에 집중하고 구성관리, 그룹관리 네이밍, 동기화 등을 제공

Kafka Client

  • Kafka와 데이터를 주고 받기 위해 사용하는 자바 라이브러리
  • Producer, Consumer, Admin, Stream 등 각종 API 제공
  • 다양한 Third Party Library 존재

Kafka 실행해보기

Zookeeper를 먼저 실행한 다음, Kafka 서버를 기동하자

# window 기준, zookeeper 실행$KAFKA_HOME/bin/windows/zookeeper-server-start.bat $KAFKA_HOME/config/zookeeper.properties

# kafka 서버 구동$KAFKA_HOME/bin/windows/kafka-server-start.bat $KAFKA_HOME/config/server.properties

topic 생성, 목록확인, 정보확인

# 생성 / kafka서버는 기본적으로 9092 포트를 사용하며 토픽의 파티션은 1개로 가정$KAFKA_HOME/bin/windows/kafka-topics.bat --create --topic 토픽명 --bootstrap-server localhost:9092 --partitions 1

# 목록 확인$KAFKA_HOME/bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --list

# 정보 확인$KAFKA_HOME/bin/windows/kafka-topics.bat --describe --topic 토픽명 --bootstrap-server localhost:9092

메세지를 생산하는 producer, 소비하는 consumer 기동하기

# 생산$KAFKA_HOME/bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic 앞에서설정한토픽명

# 소비, from-beginning 해당 토픽의 처음부터 메세지를 받아오기 위한 옵션$KAFKA_HOME/bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic 앞에서설정한토픽명 --from-beginning

Kafka Connect

  • Kafka Connect를 통해 Data를 import/export 가능하다
  • 코드 없이 Configuration으로 데이터를 이동
  • Restful api를 통해 지원
  • Stream or Batch 형태로 데이터 전송 가능
  • 커스텀 Connector를 통한 다양한 plugin 제공 (DB를 적용해 볼 예정)

  • Kafka Connect Source : 특정 리소스에서 데이터를 가져와 카프카 클러스터에 가져오는걸 개입한다.(import)
  • Kafka Connect Sink : Cluster에 저장되어 있는 데이터를 다른 쪽으로 보내는데 개입한다.(export)

Kafka Connect 실행해보기

Kafka Connect 실행

$KAFKA_CONNECT_HOME/bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties

설정 정보 수정

# rem classpath addition for core 위에 삽입
rem classpath addition for LSB style path
if exist %BASE_DIR%\share\java\kafka\* (
    call:concat %BASE_DIR%\share\java\kafka\*
)
# mariadb 연동을 위해 jdbc 추가# 위치 : \etc\kafka\connect-distributed.propertiesplugin.path=[confluentinc-kafka-connect-jdbc-10.0.1 폴더]

# 이후 kafka connect jdbc maridb 드라이버 파일을 confluent-6.1.0/share/java/kafka 에 복사

connect 연결 이후 토픽을 확인하면

이렇게 세 개의 토픽이 추가된 것을 확인할 수 있다. 커넥트가 소스에서 읽어왔던 데이터를 저장하고 관리하기 위한 토픽들이다.

Soure Connect 추가

# connect의 기본 포트는 8083
# 커넥트 추가, POST로 전송
{
"name" : "my-source-connect",
"config" : {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url":"jdbc:mysql://localhost:3306/mydb",
    "connection.user":"root",
    "connection.password":"test1357",
    "mode": "incrementing",
    "incrementing.column.name" : "id",
    "table.whitelist":"users",        # 감지할 테이블명
    "topic.prefix" : "my_topic_",    # 저장될 토픽의 prefix 최종으로 만들어진 토픽 : my_topic_users
    "tasks.max" : "1"
    }
}

my_topic_users가 토픽으로 등록된 것을 확인할 수 있다. 이후, 마리아 db에 users 테이블을 만들어 데이터를 삽입 후 확인해보면

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "user_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "pwd"
      },
      {
        "type": "string",
        "optional": true,
        "field": "name"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "created_at"
      }
    ],
    "optional": false,
    "name": "users"
  },
  "payload": {
    "id": 1,
    "user_id": "hel",
    "pwd": "1234",
    "name": "ko",
    "created_at": 1621788132000
  }
}

Schema에 fields는 각 컬럼 정보들이 저장되어 있고, payload에 컬럼별 저장된 데이터 값들이 있다.

Sink Connect 추가

{
    "name":"my-sink-connect",
    "config": {
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"test1357",
        "auto.create":"true",  # DB를 자동으로 만들기 설정, 토픽과 같은 이름의 테이블 생성
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"my_topic_users"    # 정보를 받을 토픽
    }
}

본 내용은 이도원님의 Spring Cloud로 개발하는 마이크로서비스 애플리케이션을 수강하고 정리한 내용입니다.

profile
백엔드 개발자

0개의 댓글