회사에서는 자바 혹은 노드로 서버단을 처리하지만, 프로젝트에 적용하기 전 실습이니만큼 파이썬으로 기능을 구현하고자 한다
aws configure가 마무리 되었다는 가정하에 본 포스트를 작성한다
데모용이므로 온디맨드로 가볍게 만든다. 데이터 스트림을 생성하는 것은 복잡하지 않다
데이터 스트림에서 데이터를 받아와 S3로 보낼 예정이므로, 소스와 데스티네이션을 알맞게 설정한다 후에 필요할 수 있지만, 람다를 사용하거나 데이터 포멧을 변경하는건 disabled로 저장한다타겟에 해당하는 S3 버킷의 저장 위치를 설정한다. eeyoontaek-pipeline-demo
라는 버킷의 raw/
폴더에 저장한다
import boto3
import time
import json
# 만약 aws configure이 설정되어있다면
kinesis_client = boto3.client('kinesis')
# aws configure이 설정되어있지 않았을 때
kinesis_client = boto3.client('kinesis', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, region_name=REGION_NAME)
def put_records(records):
kinesis_records = []
for r in records:
kinesis_records.append({
"Data": json.dumps(r).encode('utf-8'),
"PartitionKey": "string_for_partition"
})
response = kinesis_client.put_records(
Records=kinesis_records,
StreamName='kinesis-demo'
)
return response
def main():
while True:
print('Start to send')
data = [
{
'time': time.time()
},
{
'time': time.time() + 10
}
]
response = put_records(data)
print('response : {}'.format(response))
time.sleep(10)
if __name__ == "__main__":
main()
코드를 실행하면 위와 같이 10초마다 데이터를 전송한다
위의 사진과 같이 데이터 전송이 완료된 시각으로 연/월/일/시 폴더에 담기는 것을 확인할 수 있다