데이터브릭스에서 S3로 저장한 데이터를 타 시스템에 API로 제공해야하는 일이 생겼다.
구조는 크게 4가지를 생각했다.
4개의 구조를 보았을 때 하루 40만건 정도의 데이터를 조회하여 특정 row를 반환하는게 목적이라 마지막 API Gateway - lambda - S3를 사용하기로 결정
Lambda로 S3에 접근할 것이기 때문에 Lambda가 S3에 접근 할 수 있도록 정책을 먼저 생성해야 한다.
역할을 생성 했으니 이제 lambda를 생성 해보자.
S3에 접근하기 위해 코드를 작성할건데 parquet 형식의 데이터를 읽기 위해서는 pyarrow library와 처리를 위한 pandas library가 필요하다.
library를 다운받아 패키징을 한다.
library 패키징
library 설치
필자는 mac을 사용하며 architecture가 arm64인 반면 lambda의 환경은 x86_64이기 때문에 docker로 환경에 맞는 library를 다운 받는다.
다음 명령어를 터미널에서 차례로 실행한다. (docker 설치 과정은 생략)
-- lambda 환경 이미지 pull
docker pull lambci/lambda:build-python3.8
-- 현재 디렉토리에서 package 폴더를 만들고 docker run
docker run -it --rm -v $(pwd)/package:/lambda/package lambci/lambda:build-python3.8 bash
-- pandas, pyarrow 설치
pip install pandas pyarrow -t /lambda/package/python/lib/python3.8/site-packages
-- container에서 나가기
exit
-- lib 설치한 곳으로 이동
cd package/python/lib/python3.8/site-packages/
-- 불필요 파일 및 폴더 제거
find . -name '*.pyc' -delete
find . -name '__pycache__' -type d -exec rm -rf {} +
find . -name '*.dist-info' -type d -exec rm -rf {} +
find . -name '*.egg-info' -type d -exec rm -rf {} +
-- package 폴더로 다시 이동
cd ../../../../
-- 파일 압축
zip -r ../pyarrow-layer.zip .
library의 위치를 다음과 같이 하여 압축을 해야만 한다.
layer_content.zip
└ python/lib/python3.8/site-packages/설치
lambda에 library를 로드해줘야 하는데 용량 한계가 있어 폴더를 제거하지 않으면 library를 올리지 못한다.
이 경우 docker image를 ECR에 함수와 library를 docker image로 만들어 사용해 한다.
library 설치가 완료 되었으면 해당 zip 파일을 S3에 올려준다.
다시 lambda로 올라가 layer를 만들어준다.
공통으로 사용되는 패키지 또는 라이브러리를 압축하여 업로드 함으로써 여러 람다함수에서 접근하여 코드 수정 없이 사용할 수 있게 해준다.
layer를 생성 했으니 해당 함수와 연결 해준다.
lambda 함수와 layer 연결
이후 lambda 코드 소스를 작성한다.
코드 작성 후 deploy를 눌러 코드를 반영한다.
import json
import boto3
import pyarrow.parquet as pq
import pyarrow as pa
import logging
import io
import pandas as pd
# Logger 설정
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# S3 클라이언트 생성
s3_client = boto3.client('s3')
def lambda_handler(event, context):
try:
# API Gateway에서 id S3 버킷 정보 가져오기
id_number = event.get('queryStringParameters', {}).get('id')
if not id_number:
raise ValueError("Query parameter 'id' is required.")
bucket = 'bucket_name'
prefix = 'object_name/yyyyddmm/' # 파일 경로의 접두사 (폴더와 비슷한 개념)
if not id_number or not bucket:
raise ValueError("Event must contain 'id' and 'bucket'.")
# S3에서 파일 목록 가져오기
response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
if 'Contents' not in response:
raise ValueError(f"No files found in bucket {bucket} with prefix {prefix}.")
# 모든 Parquet 파일에서 데이터 읽기 및 필터링
all_filtered_rows = []
for item in response['Contents']:
key = item['Key']
if key.endswith('.parquet'):
logger.info(f"Processing file: {key}")
# S3에서 Parquet 파일 다운로드
parquet_data = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read()
# pyarrow를 사용하여 Parquet 파일을 읽기
table = pq.read_table(io.BytesIO(parquet_data))
# DataFrame으로 변환
df = table.to_pandas()
# contractNumber로 필터링
filtered_df = df[df['id'] == id_number]
# dict로 변환하여 리스트에 추가
all_filtered_rows.extend(filtered_df.to_dict(orient='records'))
if not all_filtered_rows:
logger.info(f"No rows found for contractNumber: {contract_number}")
return {
'statusCode': 200,
'body': json.dumps({'message': 'No matching rows found'})
}
logger.info(f"Filtered rows: {all_filtered_rows}")
return {
'statusCode': 200,
'body': json.dumps(all_filtered_rows)
}
except Exception as e:
logger.error(f"Error processing the request: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
필자는 json으로 id를 받아 그에 해당 되는 정보를 S3에서 조회하여 반환하는 코드를 짰다.
코드를 다 짰으면 API Gateway를 생성하여 연결하자.
API 생성이 되었을 거고 AWS Console에 api gateway를 입력하여 새로 만든 api를 설정해준다.
모든게 완료 되었으니 테스트를 해보자
테스트
결과