GET /connectors/
connector 목록 조회
GET /connectors?expand=status&expand=info
connector 상태, 정보 상세 조회
특정 커넥터만 보고 싶은 경우에는 /connectors 하위에 커넥터 이름 붙임
{
"kafka-connect": {
"status": {
"name": "kafka-connect",
"connector": {
"state": "RUNNING",
"worker_id": "localhost:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "localhost:8083"
}
],
"type": "sink"
},
"info": {
"name": "kafka-connect",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"behavior.on.null.values": "delete",
"auto.create.indices.at.start": "false",
"connection.password": "비밀번호",
"topics": "토픽이름",
"tasks.max": "1",
"connection.username": "유저",
"plugin.path": "/usr/share/confluent-hub-components/confluentinc-kafka-connect-elasticsearch/lib",
"compact.map.entries": "true",
"key.ignore": "true",
"schema.ignore": "true",
"behavior.on.malformed.documents": "ignore",
"offset.flush.interval.ms": "5000",
"key.converter.schemas.enable": "false",
"topic.index.map": "토픽이름:인덱스 이름",
"name": "kafka-connect",
"value.converter.schemas.enable": "false",
"connection.url": "접속 정보",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
},
"tasks": [
{
"connector": "kafka-connect",
"task": 0
}
],
"type": "sink"
}
}
}
GET /connctors/커넥터 이름/config
구성 정보 조회
DELETE /connectors/커넥터 이름
connector 삭제
GET /connectors/커넥터 이름/tasks
task 목록 조회
GET /connectors/커넥터 이름/tasks/테스크 아이디/status
task 상태 조회
문제 발생 시 에러 로그 확인 가능함
GET /connectors/커넥터 이름/topics
topic 조회
GET /connector-plugins
설치된 플러그인 목록 조회
[
{
"class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type": "sink",
"version": "5.4.0"
},
{
"class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"type": "sink",
"version": "2.0.0"
},
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "1.7.0.Final"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "7.5.2-ccs"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "7.5.2-ccs"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "7.5.2-ccs"
},
{
"class": "streams.kafka.connect.source.Neo4jSourceConnector",
"type": "source",
"version": "2.0.0"
}
]
POST /connectors
connector 등록
예시: Elasticsearch Connector 등록
body
body
{
"name": "kafka-connect", // 이름
"config": {
"topics": "토픽이름",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"name": "kafka-connect", // 이름
"connection.url": "연결 URL",
"connection.username": "유저",
"connection.password": "비밀번호",
"type.name": "_doc",
"key.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"schema.ignore": "true",
"compact.map.entries": "true",
"behavior.on.null.values": "delete",
"behavior.on.malformed.documents": "ignore",
"auto.create.indices.at.start": "false",
"tasks.max": "1",
"plugin.path": "/usr/share/confluent-hub-components/confluentinc-kafka-connect-elasticsearch/lib",
"offset.flush.interval.ms": 5000,
"topic.index.map": "토픽이름:인덱스 이름"
}
}
confluentinc/kafka-connect-elasticsearch:11.4.0 사용하니까 ES 6.8 버전이랑 호환 안됐음