sudo timedatectl set-timezone Asia/Seoul
cd /opt
sudo mkdir kafka
sudo chown ec2-user:ec2-user kafka
cd kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.12-3.6.0.tgz
tar xvf kafka_2.12-3.6.0.tgz
sudo yum list | grep java
sudo yum install -y java-11-amazon-corretto.x86_64
cd kafka_2.12-3.6.0
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
데몬으로 실행해도 되고 창을 여러개 띄워 실행해도 된다.
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
{"test-key":"test-value"}
aws console 페이지 우측 상단에 아이디를 클릭하면 보안 자격 증명 창이 보인다.
sudo aws configure
cd /opt/kafka
wget https://api.hub.confluent.io/api/plugins/confluentinc/kafka-connect-s3/versions/4.1.1/archive
unzip archive
sudo mkdir -p /opt/kafka/plugins/kafka-connect-s3
cp confluentinc-kafka-connect-s3-4.1.1/lib/* /opt/kafka/plugins/kafka-connect-s3/
cd /opt/kafka/kafka_2.12-3.6.0
vim my-connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/opt/kafka/plugins
cd ..
bin/connect-standalone.sh ./config/my-connect-standalone.properties
yum install -y python3.11.x86_64
cd
vim s3-config.py
import requests as req
import json
s3_sink_connector = {"name": "test-s3-sink-2",
"config": {"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 3,
"topics": "quickstart-events",
"s3.region": "ap-northeast-2",
"s3.bucket.name": "mybucket",
"s3.compression.type": "gzip",
"s3.part.size": 5242880,
"flush.size": 1,
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": 3600000,
"path.format": "YYYY-MM-dd",
"locale": "KR",
"timezone": "Asia/Seoul",
"schema.compatibility": "NONE"
}
}
result = req.post(url="http://localhost:8083/connectors",
headers={"Content-Type": "application/json"}, data=json.dumps(s3_sink_connector))
print(result.json())
python3 s3-config.py
https://velog.io/@gyounghwan1002/S3-Kafka-Sink-Connector-AWS