Kafka Connector REST API 정리

min·2023년 12월 12일
0

REST API

GET /connectors/

connector 목록 조회

GET /connectors?expand=status&expand=info

connector 상태, 정보 상세 조회

특정 커넥터만 보고 싶은 경우에는 /connectors 하위에 커넥터 이름 붙임

{
    "kafka-connect": {
        "status": {
            "name": "kafka-connect",
            "connector": {
                "state": "RUNNING",
                "worker_id": "localhost:8083"
            },
            "tasks": [
                {
                    "id": 0,
                    "state": "RUNNING",
                    "worker_id": "localhost:8083"
                }
            ],
            "type": "sink"
        },
        "info": {
            "name": "kafka-connect",
            "config": {
                "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
                "type.name": "_doc",
                "behavior.on.null.values": "delete",
                "auto.create.indices.at.start": "false",
                "connection.password": "비밀번호",
                "topics": "토픽이름",
                "tasks.max": "1",
                "connection.username": "유저",
                "plugin.path": "/usr/share/confluent-hub-components/confluentinc-kafka-connect-elasticsearch/lib",
                "compact.map.entries": "true",
                "key.ignore": "true",
                "schema.ignore": "true",
                "behavior.on.malformed.documents": "ignore",
                "offset.flush.interval.ms": "5000",
                "key.converter.schemas.enable": "false",
                "topic.index.map": "토픽이름:인덱스 이름",
                "name": "kafka-connect",
                "value.converter.schemas.enable": "false",
                "connection.url": "접속 정보",
                "value.converter": "org.apache.kafka.connect.json.JsonConverter",
                "key.converter": "org.apache.kafka.connect.json.JsonConverter"
            },
            "tasks": [
                {
                    "connector": "kafka-connect",
                    "task": 0
                }
            ],
            "type": "sink"
        }
    }
}

GET /connctors/커넥터 이름/config

구성 정보 조회

DELETE /connectors/커넥터 이름

connector 삭제

GET /connectors/커넥터 이름/tasks

task 목록 조회

GET /connectors/커넥터 이름/tasks/테스크 아이디/status

task 상태 조회

문제 발생 시 에러 로그 확인 가능함

GET /connectors/커넥터 이름/topics

topic 조회

GET /connector-plugins

설치된 플러그인 목록 조회

[
    {
        "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type": "sink",
        "version": "5.4.0"
    },
    {
        "class": "streams.kafka.connect.sink.Neo4jSinkConnector",
        "type": "sink",
        "version": "2.0.0"
    },
    {
        "class": "io.debezium.connector.mysql.MySqlConnector",
        "type": "source",
        "version": "1.7.0.Final"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "7.5.2-ccs"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "7.5.2-ccs"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "7.5.2-ccs"
    },
    {
        "class": "streams.kafka.connect.source.Neo4jSourceConnector",
        "type": "source",
        "version": "2.0.0"
    }
]

POST /connectors

connector 등록

예시: Elasticsearch Connector 등록

body
body

{
  "name": "kafka-connect", // 이름
  "config": {
    "topics": "토픽이름", 
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "name": "kafka-connect", // 이름
    "connection.url": "연결 URL",
    "connection.username": "유저",
    "connection.password": "비밀번호",
    "type.name": "_doc",
    "key.ignore": "true",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "schema.ignore": "true",
    "compact.map.entries": "true",
    "behavior.on.null.values": "delete",
    "behavior.on.malformed.documents": "ignore",  
    "auto.create.indices.at.start": "false",
    "tasks.max": "1",
    "plugin.path": "/usr/share/confluent-hub-components/confluentinc-kafka-connect-elasticsearch/lib",
    "offset.flush.interval.ms": 5000,
    "topic.index.map": "토픽이름:인덱스 이름"
  }
}

confluentinc/kafka-connect-elasticsearch:11.4.0 사용하니까 ES 6.8 버전이랑 호환 안됐음

profile
기록으로 기억하기

0개의 댓글