Kafka ElasticSearch Sink Connector 설정

S_H_H·2024년 7월 9일
0

Kafka 미니 플젝

목록 보기
5/7
post-thumbnail

시간을 젤 잡아먹은 Sink Connector 설정 방법 window라서 그랬을까
에러 발생 시 데이터가 쌓이는 Topic에서 데이터 시각화를 위한 ElasticSearch 연동을 시작

ElasticSearch Connector

ElasticSearch Sink Connector 다운로드 최신 버전인 14.1.0를 다운 받았습니다.

전 아래 경로로 다운 받은 파일을 옮겼습니다.
다른 경로여도 상관없습니다.

C:\kafka_2.12-2.5.0\plugin\confluentinc-kafka-connect-elasticsearch-14.1.0

Sink Connector 설정

connect-distributed.properties

  • 포트 변경으로 인해 서버 기입
  • value는 Json 받기에 수정
  • converter는 false 처리
  • plugin.path는 다운로드 받은 경로 기입
bootstrap.servers=localhost:9093
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
plugin.path=/kafka_2.12-2.5.0/plugin/confluentinc-kafka-connect-elasticsearch-14.1.0/lib/

Sink Connector 실행

아래 명령어로 서버 실행

.\bin\windows\connect-distributed.bat .\config\connect-distributed.properties

만약 서버 시작하면서 아래와 같은 에러 ReflectionsException 발생한다고 해도 괜찮습니다.
로그 설정으로 WARN이 보이지 않도록 수정하거나 Kafka v3.7.1으로 기동 시 표출이 되지 않았습니다.

WARN could not get type for name org.osgi.framework.BundleActivator from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.osgi.framework.BundleActivator

서버가 올라갔다면은 Plugins 확인을 해보시면 됩니다.

  • Kafka 3.7.1 버전에서는 시작 시 log에서도 확인이 가능했습니다.

아래 사진 처럼 ElasticSearchSinkConnector가 보인다면은 적용이 되었다는걸 알 수 있습니다.

Window Sink Connector plugin 인식이 안되시는 분

connect-distributed.properties > plugin.path 경로 확인
C:\kafka_2.12-2.5.0\ 가 아닌 /kafka_2.12-2.5.0 로 시작하게 기입하세요
이때 부터 모든 경로를 절대 경로로 기입하기 시작

다른 사용자보면 C: 부터 기입해도 문제가 없던데 저는 안되더라구요......

Connector 추가

다른 분들은 curl 명령어를 통해서 기입을 많이 하시던데
window 11 의 cmd든 powershell이든 사용하기가 참 불편해서 저는 Postman 을 사용 했습니다

헤더는 application/json 입니다.
body

{
    "name": "application_exception_kafka_sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "management_car_exception_topic_1",
        "connection.url": "http://localhost:9200",
        "type.name": "kafka-connect"
    }
}

등록이 잘 되었다면 Connector 확인 통해 등록된 정보를 확인할 수 있습니다.

Window Connector 추가 안되시는 분

NoClassDefFoundError
ClassNotFoundException
ElasticsearchStatusException

위와 같이 에러가 발생하면서 등록이 되지 않는다면, 서버 올릴때 Plugin이 잘 긁어가는지 확인해볼 필요가 있습니다.
저도 동일하게 Plugins 에서 보이길래 뭐가 문제인지 돌고 돌아왔습니다.

connect-distributed.bat 파일을 보시면 connect-distributed.properties 설정과 함께 kafka-run-class.bat 을 호출하고 있습니다.

kafka-run-class.bat을 보면 많은 lib 파일을 읽고 있습니다.

rem Classpath addition for kafka-clients
for %%i in ("%BASE_DIR%\clients\build\libs\kafka-clients*.jar") do (
	call :concat "%%i"
)

rem Classpath addition for kafka-streams
for %%i in ("%BASE_DIR%\streams\build\libs\kafka-streams*.jar") do (
	call :concat "%%i"
)

Classpath addition for release
for %%i in ("%BASE_DIR%\libs\*") do (
call :concat "%%i"
)

.
.
.

set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*

마지막 실행 명령어를 직접 찍어보면은

C:\Program Files\Java\jdk-17/bin/java" -Xmx256M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir="C:\kafka_2.12-2.5.0/logs" "-Dlog4j.configuration=file:C:\kafka_2.12-2.5.0/config/tools-log4j.properties" -cp "C:\Program Files\Java\jdk-17\lib;"C:\kafka_2.12-2.5.0\libs\*;""  org.apache.kafka.connect.cli.ConnectDistributed .\config\connect-distributed.properties

connect-distributed.properties가 붙어있길래 plugin.path 의 경로를 잘 읽어갈 줄 알았지만...... 아니더라구요
그래서 plugin.path가 잘못된건가 싶어서 해당 경로를 수정하면은 Plugins에서 보이지 않게되고

그래서 돌고 돌아 kafka-run-class.bat 파일에 plugin lib를 읽도록 강제하였습니다.
위에서 보이는 방식대로 동일하게 했지만

Classpath addition for release
for %%i in ("%BASE_DIR%\plugin\confluentinc-kafka-connect-elasticsearch-14.1.0\lib\*) do (
call :concat "%%i"
)

이제는 명령어 줄이 너무 길다고 실행자체가 안되더라구요
그래서 결론은 이렇게 해결했습니다.

call :concat "%BASE_DIR%\plugin\confluentinc-kafka-connect-elasticsearch-14.1.0\lib\*;

여러분은 이런일이 없기리 바랍니다..

profile
LEVEL UP

0개의 댓글