kafka to s3

Volc·2024년 2월 1일
0

AWS

목록 보기
4/6

Kafka 설치

  • 우선 EC2를 생성해 kafka를 설치해보겠다.
  • EC2 인스턴스 설정
    • 이름 : kafka-server
    • OS : Amazon Linux 프리티어
    • 인스턴스 유형 : t2.large
  • EC2 인스턴스를 생성 했으면 이제 서버에 들어간다.
    • window는 putty로, mac에서는 터미널로, aws console에서도 들어갈 수 있다.
  • 시간을 설정한다.
    sudo timedatectl set-timezone Asia/Seoul
  • kafka 설치를 하여 압축을 푼다.
    cd /opt
    sudo mkdir kafka
    sudo chown ec2-user:ec2-user kafka
    cd kafka
    wget https://downloads.apache.org/kafka/3.6.0/kafka_2.12-3.6.0.tgz
    tar xvf kafka_2.12-3.6.0.tgz
  • java를 설치한다.
    sudo yum list | grep java
    sudo yum install -y java-11-amazon-corretto.x86_64
  • zookeeper와 kafka를 실행한다.
    cd kafka_2.12-3.6.0
    bin/zookeeper-server-start.sh config/zookeeper.properties
    bin/kafka-server-start.sh config/server.properties

    데몬으로 실행해도 되고 창을 여러개 띄워 실행해도 된다.

  • topic을 생성한다.
 bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server       localhost:9092
  • 터미널 창을 하나 더 켜서 producer를 실행한다.
 bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
  • 다른 터미널 창을 켜서 consumer를 실행한다.
 bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
  • json 형식으로 저장을 할 것이기 때문에 다음과 같이 producer에서 입력을 해주자.
{"test-key":"test-value"}

인바운드 규칙 설정

  • EC2의 보안그룹에 들어가 인바운드 규칙을 다음과 같이 편집 해준다.

aws 등록

  • AWS Access Key를 만들어줘야 한다.

    aws console 페이지 우측 상단에 아이디를 클릭하면 보안 자격 증명 창이 보인다.

  • 액세스 키를 생성해준다.
    • Command Line Interface(CLI)를 선택해준다.
    • 액세스 키와 비밀 액세스 키를 받으면 복사하여 text파일에 저장해둔다.
  • 터미널에서 aws 설정을 해줘야 한다.
sudo aws configure
  • AWS Access Key ID 입력
  • AWS Secret Access key 입력
  • Default region name : ap-northeast-2
  • Default output format : json

S3 버킷 생성

  • S3에 버킷을 생성한다.
    • 리전은 서울(ap-northeast-2)로 설정
    • 버킷 이름 : mybucket

Kafka connector 설치

  • EC2 터미널에 들어가 kafka connector를 설치해준다.
cd /opt/kafka
wget https://api.hub.confluent.io/api/plugins/confluentinc/kafka-connect-s3/versions/4.1.1/archive
unzip archive
sudo mkdir -p /opt/kafka/plugins/kafka-connect-s3
cp confluentinc-kafka-connect-s3-4.1.1/lib/* /opt/kafka/plugins/kafka-connect-s3/
  • properties를 만든다.
cd /opt/kafka/kafka_2.12-3.6.0
vim my-connect-standalone.properties
  • properties를 작성한다.
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/opt/kafka/plugins
  • kafka connector를 실행한다.
cd ..
bin/connect-standalone.sh ./config/my-connect-standalone.properties

python script 작성

  • connector에 aws S3 연결정보를 등록 하기 위해 python script를 작성해야한다.
  • 먼저 python을 설치한다.
yum install -y python3.11.x86_64
  • python script를 만든다.
cd
vim s3-config.py
  • script를 작성한다.
import requests as req
import json

s3_sink_connector = {"name": "test-s3-sink-2",
                     "config": {"connector.class": "io.confluent.connect.s3.S3SinkConnector",
                                "tasks.max": 3,
                                "topics": "quickstart-events",
                                "s3.region": "ap-northeast-2",
                                "s3.bucket.name": "mybucket",
                                "s3.compression.type": "gzip",
                                "s3.part.size": 5242880,
                                "flush.size": 1,
                                "storage.class": "io.confluent.connect.s3.storage.S3Storage",
                                "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
                                "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
                                "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
                                "partition.duration.ms": 3600000,
                                "path.format": "YYYY-MM-dd",
                                "locale": "KR",
                                "timezone": "Asia/Seoul",
                                "schema.compatibility": "NONE"
                                }
                     }

result = req.post(url="http://localhost:8083/connectors",
                  headers={"Content-Type": "application/json"}, data=json.dumps(s3_sink_connector))
print(result.json())
  • script를 실행한다.
python3 s3-config.py
  • s3에 들어가 내가 만든 버킷에 object가 만들어져 있는지 확인한다.

참고 사이트

https://velog.io/@gyounghwan1002/S3-Kafka-Sink-Connector-AWS

profile
미래를 생각하는 개발자

0개의 댓글