Cloud Functions - GCS folder trigger

김민형·2022년 12월 15일
0

GCP - Data

목록 보기
18/43

GCS to BigQuery 파이프라인 구축에서 이어지는 내용.

앞의 포스팅과 같은 구조에서 'Cloud Storage에 지속적으로 쌓이는 csv파일들이 한 테이블에 계속해서 쌓이게끔 해줄 수는 없는가?'와 같은 요구조건이 있을 수 있다.
ex) 한 달치 데이터가 계속해서 쌓이는데 6개월, 1년치 데이를 한 테이블에서 볼 수 있게끔 등등

이럴땐 폴더 단위로 트리거를 걸 수 있으면 좋은데 보다시피 콘솔에서 Cloud Storage에 트리거를 거는 것은 버킷 단위만 되고 폴더, 파일 단위는 되지 않는다.

하지만 코드를 통해 구현할 수 있다.

Cloud Storage는 Object Storage이므로 저장되는 객체들이 단순히 콘솔에서 보면 hierarchical한 구조로 보이지만 사실은 flat한 구조이다.
객체가 저장되는 것은 사실 버킷에 '폴더1/폴더2/파일이름' 이런 이름으로 저장되는 것.

때문에 data['name']을 통해 파일 이름을 불러오면 특정 폴더에 속해있을 경우 사실 파일 이름은 아래와 같이 불러와진다.

때문에 /를 기준으로 파일 이름을 나누고 폴더명으로 테이블이 생성되게끔 설정해줬다.

main.py

from google.cloud import bigquery

def import_to_bigquery2(data, context):
    client = bigquery.Client()
    project_id = '<프로젝트 ID>'
    dataset_id = '<데이터 세트 >'

    bucket_name = data['bucket']
    file_name = data['name']

    # /를 기준으로 폴더와 파일 분리
    file_name = file_name.split('/')

    # 폴더명을 테이블명으로 지정
    table_id = file_name[-2]
    
    print(file_name)

    file_ext = file_name[-1].split('.')[-1]

    if file_ext == 'csv':
        uri = 'gs://' + bucket_name + '/' + data['name']
        dataset_ref = client.dataset(dataset_id)
        table_ref = dataset_ref.table(table_id)

        #load config
        job_config = bigquery.LoadJobConfig()
        job_config.autodetect = True
        job_config.schema_update_options = [
            bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
        ]
        job_config.create_disposition = [
            bigquery.CreateDisposition.CREATE_IF_NEEDED
        ]

        #load data
        load_job = client.load_table_from_uri(
            uri,
            table_ref,
            job_config=job_config
        )
        print('Started job {}'.format(load_job.job_id))
        load_job.result()
        print('Job finished.')
        destination_table = client.get_table(dataset_ref.table(table_id))
        print('Loaded {} rows.'.format(destination_table.num_rows))

    else:
        print('Nothing To Do')

폴더를 만들고 csv파일을 하나 업로드

폴더명으로 테이블이 생성된 것 확인(222876행)

코드에서 중간에 file_name을 print 해줬다.
Cloud Functions 로그를 보면 /를 기준으로 파일 이름이 분리된 형태를 알 수 있고 빅쿼리에 잘 적재된 것도 확인할 수 있다.

이제 같은 폴더에 csv파일을 하나 더 올려주자.

행이 하나 늘어난 것을 확인할 수 있다.

사실 버킷에서 년단위 or 월단위 데이터들을 따로 폴더로 구분해줄 필요가 없다고 한다면 시간 관련 라이브러리를 사용하여 간단하게 구현할 수 있다.

import datetime
    
today = datetime.date.today()

#연 단위일 경우
y = str(today.year)
table_id = '<테이블 ID>_' + y

#월 단위일 경우
m = str(today.month)
table_id = '<테이블 ID>_' + y + '_' + m

이러면 같은 년도 or 같은 월에 들어오는 데이터들은 자동으로 한 테이블에 쌓이게 된다.

profile
Solutions Architect (rlaalsgud97@gmail.com)

0개의 댓글