[AWS]S3를 이용한 ETL (feat.awswrangler)

포동동·2022년 10월 25일
0
post-thumbnail

😍 두근두근 S3, Athena 입성기

코딩테스트로 처음으로 본인의 의지대로 S3와 athena를 써보았다. 그러면서 겪은 나의 눈물겨운 ETL을 기록해두려한다.


우선, S3는 airflow 강의 당시 개념만 익혀두었고, athena는 건너건너 이름만 들어봤기 때문에 "아~Data Lake~대화식 쿼리 서비스~"같은 느낌만 가지고 들어가보았다.

실전파인 나는 일단 aws 콘솔을 키고 bucket을 만들고 이것저것 만져보았다. csv 파일을 업로드 하고 이걸 parquet파일로 변환하는 과정을 하면서 생각보다 어렵지 않게 boto를 이용해서 작업을 수행했다.


세상살이 쉽지 않지ㅇㅇ

그런데 이게 웬걸. 그냥 parquet 파일로 던지면 안 되고 athena를 잘 활용하기 위해서는 partitioning이라는 개념을 잘 이해했어야 했다.

예를 들어, 나는 그냥 <bucket>/<converted_data>/<파일명.parquet>로 저장했지만, 이런식이면 athena에서 쿼리를 날릴 때 그 파일 안에 있는 모든 데이터를 다 조회해야 하기때문에 비용이 만만치 않다는 것이다. 따라서 나는 날짜별로 partition을 나누기로 했다.

처음에는 여전히 boto3를 이용해서 for문으로 저장할 때 각기다른 폴더에 저장될 수 있도록 하고싶었지만, 방법을 못 찾던 그 때, 활동하던 slack 채널에 질문을 올리고 그 답을 얻을 수 있었다.


한 줄기 빛 🌟

바로 aws wrangler라는 것을 이용하는 것이다.
공식문서

구글링을 하다가 기본원리는 boto3로 이해했기 때문에 실제로 어떻게 쓰는지 보고 싶어서 유튜브를 찾아본 결과 이 영상을 보고 감을 잡았다.

대략적인 구성은 아래와 같이 짜면 된다.

def csv_to_parquet (bucket_name, file_name) :
    
    # define variables
    raw_s3_bucket = bucket_name
    raw_path_dir = f"original_data/{file_name}"
    raw_path = f"s3://{raw_s3_bucket}/{raw_path_dir}"

    standardized_s3_bucket = bucket_name
    standard_path_dir = f"converted_data/{file_name}/"
    standardized_path = f"s3://{standardized_s3_bucket}/{standard_path_dir}"
    
    # extract csv file from s3
    df = wr.s3.read_csv(raw_path)

	# transform data
    # jupyter notebook 등을 이용해 본인이 원하는 형태로 데이터프레임 조정 
              
    # load parquet file to s3    
    partition = ["보통 데이터타입의 컬럼명"]
    wr.s3.to_parquet(df, path=standardized_path, dataset=True, partition_cols=partition)
        
        
    # setting catalog for gule/athena
    if file_name == "event_data" :
        wr.catalog.create_parquet_table(
                                        database = "데이터베이스명",
                                        table = "<테이블명>",
                                        path=standardized_path,
                                        columns_types = {...},
                                        partitions_types = {"server_datetime" : "string"},
                                        compression = "snappy",
                                        description = "test",
                                        columns_comments = {...}
                                        )
        
   
    # repair table                                
    wr.athena.repair_table(database = "<데이터베이스명>",
                           table = "<원하는 테이블명>",
                           s3_output = standardized_path)

이렇게 코드를 작성해주면 s3에서 데이터 extract 하고 원하는 형태로 transform하고 다시 s3에 parquet 포맷으로 load할 수 있다.

그 다음은 그냥 평범하게 athena가서 엄청난 속도에 놀라면서 쿼리를 날리면 된다😎

[참고사이트]
https://docs.aws.amazon.com/athena/latest/ug/partitions.html
https://aws-sdk-pandas.readthedocs.io/en/stable/stubs/awswrangler.s3.to_parquet.html
https://www.youtube.com/watch?v=NNrTJnojKww
https://www.youtube.com/watch?v=zFxfA10o2E4
https://jaemunbro.medium.com/aws-athena-presto-query-guide-886ce047d710
https://velog.io/@hsh/AWSPythonAthena-%ED%8C%8C%EC%9D%B4%EC%8D%AC%EC%9C%BC%EB%A1%9C-%EC%95%84%ED%85%8C%EB%82%98%EC%97%90-%EC%BF%BC%EB%A6%AC%ED%95%98%EA%B8%B0-boto3-vs-pyathena-vs-awswrangler
https://jsonobject.tistory.com/547
https://pearlluck.tistory.com/557
https://inpa.tistory.com/entry/AWS-%F0%9F%93%9A-Athena-%EC%84%B1%EB%8A%A5-%ED%96%A5%EC%83%81-TIP-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%95%95%EC%B6%95-%ED%8C%8C%ED%8B%B0%EC%85%94%EB%8B%9D

profile
완료주의

0개의 댓글