Kafka Connect Avro Converter 적용하며 스키마 레지스트리 연결하고 이를 카프카 스트림즈로 받아 처리해보기 #2

ssongkim·2023년 3월 26일
0

Converter 란

Kafka Connectsource connectorsink connector을 연결했다고 해보자. 컨넥터와 카프카 브로커 사이에 주고 받는 메시지를 어떻게 변환하여 저장할 것인지 역할을 수행하는 것이 Converter이다.

몽고DB에서 제공하는 source connector키 스키마값 스키마의 두 가지 유형의 스키마를 default로 제공한다. 그래서 mongo source connector는 값과 키가 모두 포함된 메시지를 default 스키마 형식에 맞추어 Apache Kafka로 보낸다.(이전 게시글을 통해 메시지를 직접 확인해보자)

default 스키마 코드: https://github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/mongodb/kafka/connect/source/schema/AvroSchemaDefaults.java
링크타고 들어가보면 Avro 스키마를 이용해 default 스키마를 정의한 모습을 확인할 수 있다.

관련 자세한 docs: https://www.mongodb.com/docs/kafka-connector/current/source-connector/fundamentals/specify-schema/#std-label-source-default-key-schema

키 스키마Apache Kafka로 전송되는 메시지의 키 구조를 적용한다.
값 스키마Apache Kafka로 전송되는 메시지의 값 구조를 적용한다.

컨버터는 유연하게 선택하여 사용할 수 있도록 설계되어 있다. 사용할 수 있는 컨버터는 Avro Converter, Protobuf Converter 등 다양하게 존재한다.

시작하기

이전 게시글에 이어 실습을 진행해보았다.

이번 시간에는 Kafka Connectmongodb source connectorAvro Converter와 스키마 레지스트리를 연동하고 컨넥트에서 사용한 스키마를 스키마 레지스트리에 등록하고, 카프카 스트림즈를 이용해 컨슈머쪽에서 Avro 스키마로 메시지를 받아 처리하는 작업을 진행해본다.

Kafka Connect 작업 수행

1. source connector json 업데이트

이전 시간에서 넘어왔다고 해보자 굉장히 간단하다.

Avro Converter를 사용한다고 표기하고 스키마 레지스트리 주소를 넣어주면 된다.

{
  "name": "mongo-simple-source",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "mongodb://mongo1",
    "database": "example-stream",
    "collection": "product",

// 다음 내용이 추가됨
    "output.format.value": "schema",
    "output.format.key": "schema",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

다음과 같이 json을 업데이트하고 kafka connect에 connector를 다시 등록해주자

2. 스키마 레지스트리에 API Call을 날려 스키마 등록 확인

먼저 Change Event를 감지하기 위해 몽고디비에 변화를 준다.

스키마 등록 확인

그 다음 몽고디비 컨테이너 내부에서 다음 명령어를 실행하여 스키마 레지스트리쪽으로 API Call을 날린다.

curl schema-registry:8081/subjects

다음과 같이 스키마 레지스트리에 connector가 스키마를 등록한 것 확인이 가능하다.

["example-stream.product-key","example-stream.product-value"]

스키마 버전 조회

curl schema-registry:8081/subjects/example-stream.product-value/versions

조회해보니 버전1이 존재한다.

스키마 상세조회

버전을 넣어 api call 한다.

curl schema-registry:8081/subjects/example-stream.product-value/versions/1

응답내용

{
"subject":"example-stream.product-value",
"version":1,
"id":2,
"schema":"{\"type\":\"record\",\"name\":\"ChangeStream\",\"fields\":[{\"name\":\"_id\",\"type\":\"string\"},{\"name\":\"operationType\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"fullDocument\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ns\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ns\",\"fields\":[{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"coll\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"ns\"}],\"default\":null},{\"name\":\"to\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"to\",\"fields\":[{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"coll\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"to\"}],\"default\":null},{\"name\":\"documentKey\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"updateDescription\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"updateDescription\",\"fields\":[{\"name\":\"updatedFields\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"removedFields\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null}],\"connect.name\":\"updateDescription\"}],\"default\":null},{\"name\":\"clusterTime\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"txnNumber\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"lsid\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"lsid\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"uid\",\"type\":\"string\"}],\"connect.name\":\"lsid\"}],\"default\":null}],\"connect.name\":\"ChangeStream\"}"
}

토픽을 까보자. 무언가 Avro 형식으로 메시지가 브로커에 저장된듯하다.

Kafka Streams로 Consumer 구현

Kafka Connect 쪽에서 자신이 사용한 Avro 스키마를 스키마 레지스트리에 적재해주었음을 위에서 확인하였다.

이번 섹션에서는 해당 Avro 스키마를 스키마 레지스트리로부터 받아 메시지를 컨슘하여 kafka streams로 메시지를 처리해본다.

코틀린을 사용하였다!

1. build.gradle 설정

repositories {
    maven {
        url = uri("https://packages.confluent.io/maven/")
    }
}

dependencies {
    implementation(project(":diff-checker:diff-checker-enum"))

    // https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
    // 카프카 브로커 지원하는지 버전 확인// https://mvnrepository.com/artifact/org.apache.kafka/connect-json
    implementation("org.apache.kafka:kafka-clients:3.4.0")
    implementation("org.apache.kafka:kafka-streams:3.4.0")

    // 카프카 버전 유의
    implementation("io.confluent:kafka-streams-avro-serde:7.3.2")
    implementation("io.confluent:kafka-schema-registry-client:7.3.2")
    implementation("io.confluent:kafka-avro-serializer:7.3.2")

    testImplementation("org.apache.kafka:kafka-streams-test-utils:3.4.0")

    // https://mvnrepository.com/artifact/org.slf4j/slf4j-api
    implementation("org.slf4j:slf4j-api:2.0.7")
    implementation("ch.qos.logback:logback-classic:1.4.6")
}

2. main문 구현

package com.example.event.diffchecker

import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.KStream
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
import java.util.*

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
import org.apache.avro.generic.GenericRecord

/**
 * 카프카 스트림즈로 Avro Schema를 GenericRecord으로 받아 처리하는 예제
 */
fun main(args:Array<String>) {
    // env
    val applicationName = "mongo-diff-checker-app"
    val bootstrapServers = "localhost:9092"
    val productStreamTopic = "example-stream.product"
    val upsertProductTopic = "example-stream.product.upsert"
    val deleteProductTopic = "example-stream.product.delete"
    val schemaRegistryUrl = "http://localhost:8081"

    // Generic Avro Serde for a generic Avro record type
    val serdeConfig = mapOf(
        AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to schemaRegistryUrl
    )
    val genericAvroRecordSerde = GenericAvroSerde()
    genericAvroRecordSerde.configure(serdeConfig, false)

    // streams props 설정
    val props = Properties()
    props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = schemaRegistryUrl
    props[StreamsConfig.APPLICATION_ID_CONFIG] = applicationName
    props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
    props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String()::class.java
    props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = genericAvroRecordSerde::class.java

    // stream builder 설정
    val builder = StreamsBuilder()
    val productStream: KStream<String, GenericRecord> = builder.stream(productStreamTopic)

    productStream.filter { key, value ->
        val operationType = value.get("operationType")
        operationType.toString() == "insert" || operationType.toString() == "replace" || operationType.toString() == "update"
    }.to(upsertProductTopic)

    productStream.filter { key, value ->
        val operationType = value.get("operationType")
        operationType.toString() == "delete"
    }.to(deleteProductTopic)

    // topology 구성
    val streams = KafkaStreams(builder.build(), props)

    // start
    streams.start()
}

GenericRecord는 File/String 기반의 Schema 에서 Avro Object를 생성하는 것을 말한다.  이 방법은 runtime 에서 실패할 수 있기 때문에 사용에서는 추천되는 방법은 아니지만, 쉽게 사용 할 수 있는 장점이 있다. 이 단점을 보완하려면 SpecificRecord를 써서 구현해야한다.

3. 디버깅

{"_id": "{\"_data\": \"8264200CC6000000012B022C0100296E5A100448D3A76BAF3249C2ABF0BA0B98ABFFA3463C5F6964003C61626A6A636B6B646A6A6B686B000004\"}", "operationType": "insert", "fullDocument": "{\"_id\": \"abjjckkdjjkhk\", \"mallId\": \"애플jk\", \"title\": \"매ㄱ북sjjdasd\", \"content\": \"aaa\", \"price\": \"1000\", \"imageUrl\": \"http://aa.com/imgae\", \"_class\": \"com.example.event.productcollector.api.product.domain.Product\"}", "ns": {"db": "example-stream", "coll": "product"}, "to": null, "documentKey": "{\"_id\": \"abjjckkdjjkhk\"}", "updateDescription": null, "clusterTime": "{\"$timestamp\": {\"t\": 1679822022, \"i\": 1}}", "txnNumber": null, "lsid": null}

GenericRecord로 받은 valueprintln으로 찍어 확인해보면 이렇게 의도대로 잘 나오는 것을 확인 가능하다.

마무리

게시글 2개에 걸쳐서 전반적으로 Kafka Connect, Kafka Streams, Schema Registry, Avro Schema 등을 사용해보며 카프카에 대해 전반적인 기술 습득을 할 수 있었으며 CDC, 이벤트 스트리밍 아키텍처에 대해 이해하게 되었다.

참고자료

https://www.mongodb.com/docs/kafka-connector/current/introduction/converters/

profile
鈍筆勝聰✍️

0개의 댓글