[AWS][Python][Athena] 파이썬으로 아테나에 쿼리하기 (boto3 vs pyathena vs awswrangler)

홍성환·2022년 1월 6일
4

python

목록 보기
3/3
post-thumbnail

1. Intro

  • athena는 S3에 있는 데이터에 쿼리를 날릴 수 있는 서버리스 쿼리엔진이다.
  • S3에 데이터만 잘 준비해있으면 바로 쿼리를 날릴 수 있기 때문에 매우 편하다.
  • Athena UI에서 쿼리를 날릴 때는 별 준비사항이 없는데, 파이썬으로 쿼리 결과를 받아오려면 어떻게 해야할까?
  • 쿼리 결과를 pandas dataframe으로 받아와서 여러가지 다른 처리나 EDA를 하고 싶을 수 있다.

2. How Athena Query Work?

2-1 Select Query

  • Api를 통해서 아테나에 쿼리를 요청한다. 요청이 성공하면 쿼리 아이디를 바로 리턴해준다.
  • 쿼리 아이디를 통해 쿼리의 현재 상태를 요청할 수 있는데, 실패, 성공, 큐, 실행중 등 여러 상태가 있다.
  • 아테나에 select 쿼리를 날리게 되면 무조건 "csv"로 쿼리 결과가 S3에 떨어지게 된다.
  • 따라서 다음과 같은 과정을 거처야 한다.
    1. 쿼리 실행
    2. 쿼리 상태 성공할 때 까지 계속 해서 확인
    3. 쿼리 성공시 S3에 있는 결과를 읽어오기

2-2 CTAS Query

  • CTAS 쿼리는 create table as select의 약자이다.
  • 테이블을 새로 만드는데 쿼리 결과에 따라 만드는 것을 뜻한다.
  • Athena에서는 Parquet, ORC등 압축된 파일에 쿼리를 날리고 데이터를 저장하는 것이 가능하다.
  • CTAS 쿼리를 통해 테이블을 새로 만들때도 ORC(zlib), Parquet(snappy)을 사용할 수 있다.
  • CTAS 쿼리 결과 또한 S3로 떨어지게 되며, athena에서 해당 데이터의 테이블을 볼 수 있다.

2-3 단순 select 쿼리를 날릴 때도 CTAS가 좋은이유

  • 단순 select 쿼리는 'csv'로 밖에 저장을 못 한다.
  • ORC, Parquet는 csv보다 용량이 많게는 10배 이상으로 적다.
  • 하지만, 단순 select 쿼리는 무조건 csv로만 저장한다.
    • 따라서 ctas는 select보다 S3로 저장할때, S3에서 다시 데이터를 불러올 때 data transfer 비용을 절약할 수 있고 속도도 빠르다.
  • 따라서 원하는 쿼리를 CTAS로 바꾸어 임시로 필요 없는 테이블을 만들어 데이터 결과를 ORC나 Parquet로 저장 후 읽어오면 대용량 데이터일 경우 엄청 빨라진다.
  • 하지만 임시로 필요없는 테이블을 만들고, 다시 그 테이블을 지워야 하고 CTAS 쿼리는 단순 SELECT 쿼리보다 복잡하게 생겼기 때문에 직관적이지 않아서 라이브러리를 만들지 않고서는 쓰기 힘들다.

2. Boto3

  • Amazon의 GUI 기능을 python에서 쓸 때는 보통 boto3를 생각하게 된다.
  • athena에서도 역시 boto3를 통해서 쿼리를 날릴 수 있다.
  • 하지만 boto3에서는 아주 기본적인 기능만 제공해서 쓰기 정말 힘들다.
    1. 쿼리 실행
    2. 쿼리 상태 성공할 때 까지 계속 해서 확인
  • 위 2개 기능만 제공한다고 생각하면 된다.
  • 쿼리가 끝난 후 s3에서 데이터를 읽어오고 계속해서 쿼리가 끝났는지 체크하는 것이 매우 힘들다.
  • 진짜 딱 기본기능만 원하면 Boto3도 좋은 선택이 될 수도 있다.

3. Pyathena

3-1 Cursor

  • 아래 처럼 pep-249의 cursor object를 만들고 쿼리를 날릴 수 있다.
from pyathena import connect

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
                 region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM one_row")
print(cursor.description)
print(cursor.fetchall())

3-2 iteration

  • 처리해야할 데이터가 너무 클 때는 아래처럼 조금씩 fetch하면서 데이터를 처리할 수 있다.
from pyathena import connect

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
                 region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM many_rows LIMIT 10")
for row in cursor:
    print(row)

3-3 pd.read_sql_query

  • 아래 처럼 pd.read_sql_query로 하번에 데이터 결과를 받아올 수도 있다.
from pyathena import connect
import pandas as pd

conn = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
               region_name="us-west-2")
df = pd.read_sql_query("SELECT * FROM many_rows", conn)
print(df.head())

3-4 기타

  • 이외에도 asyncio를 사용할 수 있다.
  • pd.read_sql 뿐만 아니라 pd.to_sql 편하게 작동 시킬 수 있다. (list 같은 경우는 잘 작동하지 않는다.)

3-5 단점

pyathena는 select 쿼리 결과를 무조건 csv로 저장한다.
위에서 설명한대로 CTAS 쿼리를 사용하면 parquet, orc로 저장할 수 있는데, 이 작업을 유저가 직접해줘야 한다.
라이브러리에서는 쿼리만 받으면 자동으로 CTAS쿼리를 생성하고, 쿼리가 끝나면 테이블을 삭제할 수는 없을까..? ㅜ_ㅜ
또한 pd.to_sql이 list와 같은 데이터 타입에서는 잘 작동되지 않는 단점도 있다.

4. AWS Data Warngler

4-1 read_sql_query

read_sql_query는 CTAS를 통해 자동으로 데이터를 parquet로 저장한다.
만들어진 테이블은 자동으로 쿼리가 끝나면 지워준다.
데이터가 커질 수록 쿼리 성능에 아주 큰 차이가 존재한다.

  • 아래는 튜토리얼 링크에 존재하는 정리 페이지인데 CTAS와 csv 방식의 장단점이 기술되어 있다.

4-2 기본

  • querty ,database, workgroup
dfs = wr.athena.read_sql_query(
    "SELECT * FROM noaa",
    database="awswrangler_test",
    workgroup='my-name'
)
  • 위처럼 원하는 쿼리를 날릴 수 있다.
  • database는 wrangler용도로 따로 만들어 두는 것을 추천 하는데 해당 database에 쿼리 결과를 임시로 저장할 테이블을 만들기 때문이다.
  • 위의 예제에서 SELECT * FROM noaa 이런 쿼리를 날려도 실제로는 CRATE TABLE {database}.tmp123123...생략... * FROM noaa 이런식으로 쿼리를 아테나에 요청하게 된다.
  • 이때 프로세스가 갑자기 꺼지거나 이러면 wargnler가 생성한 임시 테이블이 해당 database에서 지워지지 않은채 남아있는 것을 볼 수 있다. 이 테이블은 직접 지워야하는 불편함이 있긴하다.

4-3 chunk

  • 쿼리 결과가 너무 클때는 chunking하여 데이터 결과를 받아올 수도 있다.

dfs = wr.athena.read_sql_query(
    "SELECT * FROM noaa",
    database="awswrangler_test",
    chunksize=100_000_000
)

for df in dfs:  # Batching
    print(len(df.index))

4-4 s3.to_parquet, to_csv, to_json

  • pyathena에서는 pd.to_sql로 편하게 df를 athena 테이블과 연결된 s3로 데이터를 올릴 수 있었다.
    • 하지만 list는 잘 안되는 단점이 있었다.
  • parquet일 경우만 wrangler에서도 df를 편하게 s3로 올릴 수 있다.
    • ORC는 안 되고 parquet(snappy)만 되서 너무 아쉽다.
    • ORC(Zlib)의 성능이 더 좋기 때문이다.
  • 아래 예시 처럼 파티션, db이름, 테이블 이름을 지정해서 쓸 수 있다.
  • 자동으로 해당 테이블이 저장되어야할 경로로 데이터가 저장된다.
  • 단순히 s3로 데이터를 올리기만 하는 것이 아니라 타입체크 등 이 데이터가 해당 테이블에 적합하지 않으면 에러가 나게 할 수 있기 때문에 마음이 편하다.
import awswrangler as wr
import pandas as pd
wr.s3.to_parquet(
    df=pd.DataFrame({
        'col': [1, 2, 3],
        'col2': ['A', 'A', 'B']
    }),
    dataset=True,
    partition_cols=['col2'],
    database='default',  # Athena/Glue database
    table='my_table'  # Athena/Glue table
)

5. 결론

  • python 환경에서 athena,s3의 데이터에 접근하는 방법을 알아봄
  • boto3 : 기본 기능만 원할 떄 좋다
  • pyathena: 표준 적인 cursor object를 원할 떄 좋다
  • wrangler: parquet로 데이터를 읽고 쓰고 다양한 기능을 원할 떄 좋다.
profile
Machine Learning Engineer: recsys, mlops

0개의 댓글