개요
- S3 -> Kafka -> Spark
- S3로부터 mp4 파일을 다운로드 받는 코드 &
- Kafka-python API를 사용하여 Python Consumer 개발 예정

Install
pip install opencv-python
코드
import subprocess
from kafka import KafkaConsumer
from json import loads
import environ
import boto3
import cv2
import os
BUCKET_NAME = os.environ['bucket_name']
s3 = boto3.client(
service_name='s3',
region_name=os.environ['Region_name'],
aws_access_key_id=os.environ['aws_access_key_id'],
aws_secret_access_key=os.environ['aws_secret_access_key'],
)
consumer = KafkaConsumer(
'video',
bootstrap_servers=['브로커주소:포트번호'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')),
consumer_timeout_ms=1000
)
while True:
for message in consumer:
s3File = message.value['title']
s3.download_file(BUCKET_NAME, 'BUCKET_URL', 'LOCAL_URL')
filepath = '/' + str(s3File)
video = cv2.VideoCapture(filepath)
try:
if not os.path.exists('/' + filepath[:-4]):
os.makedirs(filepath[:-4])
except OSError:
print('Error: Creating directory. ' + filepath[:-4])
num = 1
count = 0
while(video.isOpened()):
try:
ret, image = video.read()
image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
image = cv2.resize(image, (800, 400))
except:
break;
if(int(video.get(1)) % num == 0):
cv2.imwrite('/'+filepath[:-4] + "/" + filepath[:-4] + "_%d.jpg" % count, image)
count += 1
video.release()
결과
- 이미지를 로컬에 저장됨
- 이미지를 한프레임마다 나누어서 저장됨
- 이미지는 사이즈 변경이 가능함 > default 값으로 800*400으로 설정
- 이미지 전처리가 빠르게 될 수 있도록 흑백처리
- 동영상의 이름과 동일한 디렉토리에 저장할 예정
- 이미지 전처리 부분은 다른 조원이 맡음

URL
Walker