시간을 젤 잡아먹은 Sink Connector 설정 방법 window라서 그랬을까
에러 발생 시 데이터가 쌓이는 Topic에서 데이터 시각화를 위한 ElasticSearch 연동을 시작
ElasticSearch Sink Connector 다운로드 최신 버전인 14.1.0를 다운 받았습니다.
전 아래 경로로 다운 받은 파일을 옮겼습니다.
다른 경로여도 상관없습니다.
C:\kafka_2.12-2.5.0\plugin\confluentinc-kafka-connect-elasticsearch-14.1.0
connect-distributed.properties
bootstrap.servers=localhost:9093
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
plugin.path=/kafka_2.12-2.5.0/plugin/confluentinc-kafka-connect-elasticsearch-14.1.0/lib/
아래 명령어로 서버 실행
.\bin\windows\connect-distributed.bat .\config\connect-distributed.properties
만약 서버 시작하면서 아래와 같은 에러 ReflectionsException
발생한다고 해도 괜찮습니다.
로그 설정으로 WARN이 보이지 않도록 수정하거나 Kafka v3.7.1으로 기동 시 표출이 되지 않았습니다.
WARN could not get type for name org.osgi.framework.BundleActivator from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.osgi.framework.BundleActivator
서버가 올라갔다면은 Plugins 확인을 해보시면 됩니다.
아래 사진 처럼 ElasticSearchSinkConnector가 보인다면은 적용이 되었다는걸 알 수 있습니다.
C:\kafka_2.12-2.5.0\ 가 아닌 /kafka_2.12-2.5.0 로 시작하게 기입하세요
이때 부터 모든 경로를 절대 경로로 기입하기 시작
다른 사용자보면 C: 부터 기입해도 문제가 없던데 저는 안되더라구요......
다른 분들은 curl 명령어를 통해서 기입을 많이 하시던데
window 11 의 cmd든 powershell이든 사용하기가 참 불편해서 저는 Postman
을 사용 했습니다
헤더는 application/json
입니다.
body
{
"name": "application_exception_kafka_sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "management_car_exception_topic_1",
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect"
}
}
등록이 잘 되었다면 Connector 확인 통해 등록된 정보를 확인할 수 있습니다.
NoClassDefFoundError
ClassNotFoundException
ElasticsearchStatusException
위와 같이 에러가 발생하면서 등록이 되지 않는다면, 서버 올릴때 Plugin이 잘 긁어가는지 확인해볼 필요가 있습니다.
저도 동일하게 Plugins 에서 보이길래 뭐가 문제인지 돌고 돌아왔습니다.
connect-distributed.bat
파일을 보시면 connect-distributed.properties
설정과 함께 kafka-run-class.bat
을 호출하고 있습니다.
kafka-run-class.bat
을 보면 많은 lib 파일을 읽고 있습니다.
rem Classpath addition for kafka-clients
for %%i in ("%BASE_DIR%\clients\build\libs\kafka-clients*.jar") do (
call :concat "%%i"
)
rem Classpath addition for kafka-streams
for %%i in ("%BASE_DIR%\streams\build\libs\kafka-streams*.jar") do (
call :concat "%%i"
)
Classpath addition for release
for %%i in ("%BASE_DIR%\libs\*") do (
call :concat "%%i"
)
.
.
.
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
마지막 실행 명령어를 직접 찍어보면은
C:\Program Files\Java\jdk-17/bin/java" -Xmx256M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir="C:\kafka_2.12-2.5.0/logs" "-Dlog4j.configuration=file:C:\kafka_2.12-2.5.0/config/tools-log4j.properties" -cp "C:\Program Files\Java\jdk-17\lib;"C:\kafka_2.12-2.5.0\libs\*;"" org.apache.kafka.connect.cli.ConnectDistributed .\config\connect-distributed.properties
connect-distributed.properties
가 붙어있길래 plugin.path 의 경로를 잘 읽어갈 줄 알았지만...... 아니더라구요
그래서 plugin.path가 잘못된건가 싶어서 해당 경로를 수정하면은 Plugins에서 보이지 않게되고
그래서 돌고 돌아 kafka-run-class.bat
파일에 plugin lib를 읽도록 강제하였습니다.
위에서 보이는 방식대로 동일하게 했지만
Classpath addition for release
for %%i in ("%BASE_DIR%\plugin\confluentinc-kafka-connect-elasticsearch-14.1.0\lib\*) do (
call :concat "%%i"
)
이제는 명령어 줄이 너무 길다고 실행자체가 안되더라구요
그래서 결론은 이렇게 해결했습니다.
call :concat "%BASE_DIR%\plugin\confluentinc-kafka-connect-elasticsearch-14.1.0\lib\*;
여러분은 이런일이 없기리 바랍니다..