pip install kafka-python
pip install boto3
# pip install kafka-python
# pip install boto3
# pip install opencv-python
import subprocess
from kafka import KafkaConsumer
from kafka import KafkaProducer
from json import loads
import environ
import boto3
import cv2
import os
BUCKET_NAME = os.environ['bucket_name']
s3 = boto3.client(
service_name='s3',
region_name=os.environ['Region_name'],
aws_access_key_id=os.environ['aws_access_key_id'],
aws_secret_access_key=os.environ['aws_secret_access_key'],
)
# Kafka Producer 셋팅
producer = KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=[os.environ['Kafka_IP']], # IP주소
value_serializer=lambda v: dumps(v).encode('utf-8'),
)
bucket_name = os.environ['bucket_name'] # 버켓 이름
csvfile = 'csv파일' # 저장할 csv 데이터
csvfilepath = '/' + csvfile # 저장할 파일 이름
# s3에 업로드
s3.upload_file(csvfile, bucket_name, 'csv' + csvfilepath)
producer.send('sendunity', {
'csv': str(num) + '.csv',
})
time.sleep(0.2) # 부하를 막기 위해 0.2초 쉬기
producer.flush() # 데이터 비우기
# 이미지 압축
subprocess.call('tar -zcvf' + filepath[:-4] + '*' + '.jpg', shell=True)
# s3에 이미지 파일 백업
s3.upload_file(zipfile, bucket_name, 'zip' + csvfilepath)
# 압축파일, 동영상 삭제
subprocess.call('rm -rf /' + filepath[:-4] + '.tar.gz, /' + filepath[:-4] + ', /' + filepath, shell=True)
subprocess.call('rm -rf /*.csv', shell=True)
S3
Broker