1. Intro
- athena는 S3에 있는 데이터에 쿼리를 날릴 수 있는 서버리스 쿼리엔진이다.
- S3에 데이터만 잘 준비해있으면 바로 쿼리를 날릴 수 있기 때문에 매우 편하다.
- Athena UI에서 쿼리를 날릴 때는 별 준비사항이 없는데, 파이썬으로 쿼리 결과를 받아오려면 어떻게 해야할까?
- 쿼리 결과를 pandas dataframe으로 받아와서 여러가지 다른 처리나 EDA를 하고 싶을 수 있다.
2. How Athena Query Work?
2-1 Select Query
- Api를 통해서 아테나에 쿼리를 요청한다. 요청이 성공하면 쿼리 아이디를 바로 리턴해준다.
- 쿼리 아이디를 통해 쿼리의 현재 상태를 요청할 수 있는데,
실패, 성공, 큐, 실행중
등 여러 상태가 있다.
- 아테나에 select 쿼리를 날리게 되면 무조건 "csv"로 쿼리 결과가 S3에 떨어지게 된다.
- 따라서 다음과 같은 과정을 거처야 한다.
- 쿼리 실행
- 쿼리 상태 성공할 때 까지 계속 해서 확인
- 쿼리 성공시 S3에 있는 결과를 읽어오기
2-2 CTAS Query
- CTAS 쿼리는 create table as select의 약자이다.
- 테이블을 새로 만드는데 쿼리 결과에 따라 만드는 것을 뜻한다.
- Athena에서는 Parquet, ORC등 압축된 파일에 쿼리를 날리고 데이터를 저장하는 것이 가능하다.
- CTAS 쿼리를 통해 테이블을 새로 만들때도 ORC(zlib), Parquet(snappy)을 사용할 수 있다.
- CTAS 쿼리 결과 또한 S3로 떨어지게 되며, athena에서 해당 데이터의 테이블을 볼 수 있다.
2-3 단순 select 쿼리를 날릴 때도 CTAS가 좋은이유
- 단순 select 쿼리는 'csv'로 밖에 저장을 못 한다.
- ORC, Parquet는 csv보다 용량이 많게는 10배 이상으로 적다.
- 하지만, 단순 select 쿼리는 무조건 csv로만 저장한다.
- 따라서 ctas는 select보다 S3로 저장할때, S3에서 다시 데이터를 불러올 때 data transfer 비용을 절약할 수 있고 속도도 빠르다.
- 따라서 원하는 쿼리를 CTAS로 바꾸어 임시로 필요 없는 테이블을 만들어 데이터 결과를 ORC나 Parquet로 저장 후 읽어오면 대용량 데이터일 경우 엄청 빨라진다.
- 하지만 임시로 필요없는 테이블을 만들고, 다시 그 테이블을 지워야 하고 CTAS 쿼리는 단순 SELECT 쿼리보다 복잡하게 생겼기 때문에 직관적이지 않아서 라이브러리를 만들지 않고서는 쓰기 힘들다.
2. Boto3
- Amazon의 GUI 기능을 python에서 쓸 때는 보통 boto3를 생각하게 된다.
- athena에서도 역시 boto3를 통해서 쿼리를 날릴 수 있다.
- 하지만 boto3에서는 아주 기본적인 기능만 제공해서 쓰기 정말 힘들다.
- 쿼리 실행
- 쿼리 상태 성공할 때 까지 계속 해서 확인
- 위 2개 기능만 제공한다고 생각하면 된다.
- 쿼리가 끝난 후 s3에서 데이터를 읽어오고 계속해서 쿼리가 끝났는지 체크하는 것이 매우 힘들다.
- 진짜 딱 기본기능만 원하면 Boto3도 좋은 선택이 될 수도 있다.
3. Pyathena
3-1 Cursor
- 아래 처럼 pep-249의 cursor object를 만들고 쿼리를 날릴 수 있다.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM one_row")
print(cursor.description)
print(cursor.fetchall())
3-2 iteration
- 처리해야할 데이터가 너무 클 때는 아래처럼 조금씩 fetch하면서 데이터를 처리할 수 있다.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM many_rows LIMIT 10")
for row in cursor:
print(row)
3-3 pd.read_sql_query
- 아래 처럼 pd.read_sql_query로 하번에 데이터 결과를 받아올 수도 있다.
from pyathena import connect
import pandas as pd
conn = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2")
df = pd.read_sql_query("SELECT * FROM many_rows", conn)
print(df.head())
3-4 기타
- 이외에도 asyncio를 사용할 수 있다.
- pd.read_sql 뿐만 아니라 pd.to_sql 편하게 작동 시킬 수 있다. (list 같은 경우는 잘 작동하지 않는다.)
3-5 단점
pyathena는 select 쿼리 결과를 무조건 csv로 저장한다.
위에서 설명한대로 CTAS 쿼리를 사용하면 parquet, orc로 저장할 수 있는데, 이 작업을 유저가 직접해줘야 한다.
라이브러리에서는 쿼리만 받으면 자동으로 CTAS쿼리를 생성하고, 쿼리가 끝나면 테이블을 삭제할 수는 없을까..? ㅜ_ㅜ
또한 pd.to_sql이 list와 같은 데이터 타입에서는 잘 작동되지 않는 단점도 있다.
4. AWS Data Warngler
- AWS data wrangler : https://github.com/awslabs/aws-data-wrangler
- wrangler는 아테나, s3, emr, redshift 등등 aws의 데이터가 존재하는 거의 모든 곳에 데이터와 관련된 기능을 제공하는 라이브러리이다.
- 엄청나게 방대한 기능이 있지만 지금은 athena와 관련된 기능만 살펴보자!
- 아래 튜토리얼을 통해 아테나 사용법을 대강 익힐 수도 있다.
- AWS Data Warngler는 pyathena 처럼 PEP-249를 구현한 cursor object를 제공해주진 않지만 여러가지 편한 함수를 제공한다.
4-1 read_sql_query
read_sql_query는 CTAS를 통해 자동으로 데이터를 parquet로 저장한다.
만들어진 테이블은 자동으로 쿼리가 끝나면 지워준다.
데이터가 커질 수록 쿼리 성능에 아주 큰 차이가 존재한다.
- 아래는 튜토리얼 링크에 존재하는 정리 페이지인데 CTAS와 csv 방식의 장단점이 기술되어 있다.
4-2 기본
- querty ,database, workgroup
dfs = wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
workgroup='my-name'
)
- 위처럼 원하는 쿼리를 날릴 수 있다.
- database는 wrangler용도로 따로 만들어 두는 것을 추천 하는데 해당 database에 쿼리 결과를 임시로 저장할 테이블을 만들기 때문이다.
- 위의 예제에서
SELECT * FROM noaa
이런 쿼리를 날려도 실제로는 CRATE TABLE {database}.tmp123123...생략... * FROM noaa
이런식으로 쿼리를 아테나에 요청하게 된다.
- 이때 프로세스가 갑자기 꺼지거나 이러면 wargnler가 생성한 임시 테이블이 해당 database에서 지워지지 않은채 남아있는 것을 볼 수 있다. 이 테이블은 직접 지워야하는 불편함이 있긴하다.
4-3 chunk
- 쿼리 결과가 너무 클때는 chunking하여 데이터 결과를 받아올 수도 있다.
dfs = wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
chunksize=100_000_000
)
for df in dfs: # Batching
print(len(df.index))
4-4 s3.to_parquet, to_csv, to_json
- pyathena에서는 pd.to_sql로 편하게 df를 athena 테이블과 연결된 s3로 데이터를 올릴 수 있었다.
- parquet일 경우만 wrangler에서도 df를 편하게 s3로 올릴 수 있다.
- ORC는 안 되고 parquet(snappy)만 되서 너무 아쉽다.
- ORC(Zlib)의 성능이 더 좋기 때문이다.
- 아래 예시 처럼 파티션, db이름, 테이블 이름을 지정해서 쓸 수 있다.
- 자동으로 해당 테이블이 저장되어야할 경로로 데이터가 저장된다.
- 단순히 s3로 데이터를 올리기만 하는 것이 아니라 타입체크 등 이 데이터가 해당 테이블에 적합하지 않으면 에러가 나게 할 수 있기 때문에 마음이 편하다.
import awswrangler as wr
import pandas as pd
wr.s3.to_parquet(
df=pd.DataFrame({
'col': [1, 2, 3],
'col2': ['A', 'A', 'B']
}),
dataset=True,
partition_cols=['col2'],
database='default', # Athena/Glue database
table='my_table' # Athena/Glue table
)
5. 결론
- python 환경에서 athena,s3의 데이터에 접근하는 방법을 알아봄
- boto3 : 기본 기능만 원할 떄 좋다
- pyathena: 표준 적인 cursor object를 원할 떄 좋다
- wrangler: parquet로 데이터를 읽고 쓰고 다양한 기능을 원할 떄 좋다.