쿠버네티스에 카프카 커넥트 and debezium 구축

유현민·2022년 10월 18일
0

인턴 과제

목록 보기
11/15

debezium에서 만든 이미지를 사용했다. 환경 변수 설정하는 데서 시간을 오래 잡아먹었는데 처음에는 s CONNECT_를 앞에 붙여야 하는 줄 알았는데 알고 보니 빼고 써야 했다.

statefulset으로 구성을 했다.

debezium.yaml

bootstrap_servers -> 여기에 자신이 구축한 카프카 브로커 서버를 써주면 된다.

apiVersion: v1
kind: Service
metadata:
    name: connect-service
    namespace: debezium
    labels:
      name: connect
spec:
    ports:
    - port: 8083
      name: debezium-port
      protocol: TCP
      nodePort: 30651
    selector:
      app: connect
    type: LoadBalancer
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: connect
  namespace: debezium
spec:
  selector:
    matchLabels:
      app: connect
  serviceName: connect-service
  replicas: 2
  updateStrategy:
    type: RollingUpdate
  podManagementPolicy: OrderedReady
  template:
    metadata:
      labels:
        app: connect
    spec:
      restartPolicy: Always
      containers:
      - name: kafka-connect
        image: debezium/connect
        resources:
          requests: 
            cpu: "1"
            memory: 1Gi
          # limits:
          #   cpu: "2"
          #   memory: 1Gi
        ports:
        - containerPort: 8083
          name: connect
        - containerPort: 3306
          name: sql
        
        volumeMounts:
        - name: connect
          mountPath: var/lib/plugin
        env:
        - name: MY_POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        - name: BOOTSTRAP_SERVERS
          value: kafka-broker-0.kafka-service.default.svc.cluster.local:9092 ,kafka-broker-1.kafka-service.default.svc.cluster.local:9092,kafka-broker-2.kafka-service.default.svc.cluster.local:9092
        - name: GROUP_ID
          value: connect
        - name: CONFIG_STORAGE_TOPIC
          value: configs
        - name: CONFIG_STORAGE_REPLICATION_FACTOR
          value: "1"
        - name: OFFSET_STORAGE_TOPIC
          value: offsets
        - name: OFFSET_STORATE_REPLICATION_FACTOR
          value: "1"
        - name: STATUS_STORAGE_TOPIC
          value: statuses
        - name: STATUS_STORAGE_REPLICATION_FACTOR
          value: "1"
        - name: KEY_CONVERTER
          value: org.apache.kafka.connect.storage.StringConverter
        - name: VALUE_CONVERTER
          value: org.apache.kafka.connect.converters.ByteArrayConverter
        - name: INTERNAL_KEY_CONVERTER
          value: org.apache.kafka.connect.json.JsonConverter
        - name: INTERNAL_VALUE_CONVERTER
          value: org.apache.kafka.connect.json.JsonConverter
        - name: REST_ADVERTISED_HOST_NAME
          value: $(MY_POD_IP)
        - name: REPLICATION_FACTOR
          value: "1"
        - name: REST_ADVERTISED_LISTENER
          value: PLAINTEXT://$(MY_POD_IP)

  volumeClaimTemplates:
  - metadata:
      name: connect
    spec:
      accessModes: ["ReadWriteOnce"]
      storageClassName: "kafka-debezium"
      resources:
        requests:
          storage: 10Gi

pv.yaml

pv이름은 본인 자유다

apiVersion: v1
kind: PersistentVolume
metadata:
  name: connect-0
spec:
  capacity:
    storage: 10Gi
  accessModes:
    - ReadWriteOnce
  persistentVolumeReclaimPolicy: Delete
  storageClassName: "kafka-debezium"
  hostPath:
    path: /data/
    type: DirectoryOrCreate

debezium 이미지 안에는 debezium 커넥터가 포함되어 있기 때문에 따로 다운로드하지 않아도 된다.

rest

테스트해보니 rest 요청은
외부에서는 퍼블릭 IP:nodeport
내부에서는 포드 IP:debezium port, 노드 private IP:nodeport

profile
smilegate megaport infra

0개의 댓글