Dataflow 템플릿 커스텀해보기

김민형·2022년 9월 29일
1

GCP - Data

목록 보기
13/44

Dataflow관련 실습을 진행할 때 Google에서 제공해주는 템플릿을 쓰는 거 아니면 그냥 특정 실습에서 쓰는 코드를 clone하여 명령어만 입력했다. 하지만 실습에서 쓰는 코드들은 항상 그 실습에만 적용되는 Transform이 포함되어 있다. 때문에 간단하고 기본적인 템플릿을 한 번 커스텀해봤다.

사전 세팅

  • 빅쿼리 데이터 세트 생성
  • timestamp, INT/FLOAT 유형의 컬럼, STRING 유형의 컬럼을 가지는 테이블 생성
  • Dataflow가 동작하는 스테이지의 파일을 저장할 Cloud Storage 버킷 생성

Python 환경 세팅

sudo apt-get update

# Python3이 설치되어 있다고 가정하고 아래 명령어들 시행
sudo apt-get install -y python3-venv
python3 -m venv <가상환경 이름>
source <가상환경 이름>/bin/activate

# 필요한 패키지들 설치
python3 -m pip install -q --upgrade pip setuptools wheel
python3 -m pip install apache-beam[gcp]

ps_to_bq.py

import argparse
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.runners import DataflowRunner, DirectRunner

logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)

# On-prem 환경일 경우 import os를 한 후, 아래와 같이 서비스 계정 키 경로를 넣어줘야 한다.
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "<서비스 계정 경로>"

class CustomParsing(beam.DoFn):
    """ Custom ParallelDo class to apply a custom transformation """

    def to_runner_api_parameter(self, unused_context):
        # Not very relevant, returns a URN (uniform resource name) and the payload
        return "beam:transforms:custom_parsing:custom_v0", None

    def process(self, element: bytes, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
        """
        Simple processing function to parse the data and add a timestamp
        For additional params see:
        https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
        """
        parsed = json.loads(element.decode("utf-8"))
        # rfc3339란 날짜, 시간을 표현함에 있어 표준이 되는 포맷이다.
        parsed["timestamp"] = timestamp.to_rfc3339()
        yield parsed


def run():
    # Parsing arguments
    # project, region, staging_location, temp_location은 필수로 지정해줘야 하는 항목이다.
    parser = argparse.ArgumentParser()
    parser.add_argument("--project",required=True,help="GCP 프로젝트 ID",)
    parser.add_argument("--region",required=True,help="GCP 리전",)
    parser.add_argument("--staging_location",required=True,help="Cloud Storage Bucket의 staging 폴더 경로",)
    parser.add_argument("--temp_location",required=True,help="Cloud Storage Bucket의 temp 폴더 경로",)
    parser.add_argument("--input_subscription",required=True,help="데이터를 전달할 Pub/Sub 주제의 구독 경로",)
    parser.add_argument("--output_table",required=True,help="빅쿼리 테이블 경로",)
    parser.add_argument("--output_schema",required=True,help="스키마 지정",)
    parser.add_argument("--runner",required=True,help="Dataflow Runner 지정",)
    known_args, pipeline_args = parser.parse_known_args()

    # Creating pipeline options
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(StandardOptions).streaming = True
    pipeline_options.view_as(GoogleCloudOptions).project = known_args.project
    pipeline_options.view_as(GoogleCloudOptions).region = known_args.region
    pipeline_options.view_as(GoogleCloudOptions).staging_location = known_args.staging_location
    pipeline_options.view_as(GoogleCloudOptions).temp_location = known_args.temp_location
    pipeline_options.view_as(StandardOptions).runner = known_args.runner

    # Defining our pipeline and its steps
    with beam.Pipeline(options=pipeline_options) as p:
        (
            p
            | "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
                subscription=known_args.input_subscription, timestamp_attribute=None
            )
            | "CustomParse" >> beam.ParDo(CustomParsing())
            | "WriteToBigQuery" >> beam.io.WriteToBigQuery(
                known_args.output_table,
                schema=known_args.output_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                # create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                # write_disposition 옵션 = 기존 테이블에 데이터 추가
                # creat_disposition 옵션 = 빅쿼리에 테이블이 없다고 하면 여기서 명시해주는 스키마대로 필요하면 테이블을 생성
            )
        )


if __name__ == "__main__":
    run()

실행

python ps_to_bq.py --streaming \
--project=<프로젝트 ID> \
--region=<리전> \
--staging_location=gs://<버킷명>/staging \
--temp_location=gs://<버킷명>/temp \
--input_subscription=projects/<프로젝트 ID>/subscriptions/<구독명> \
--output_table=<프로젝트 ID>:<데이터세트명>.<테이블명> \
--output_schema="timestamp:TIMESTAMP,<숫자 변수>:FLOAT,<문자 변수>:STRING" \
--runner=DataflowRunner \
--job_name=<Dataflow Job 이름>

테스트

콘솔에서 Pub/Sub 주제에 직접 메시지를 게시해줄 수도 있지만 원하는 갯수만큼 랜덤하게 데이터를 생성해서 Pub/Sub으로 보낼 파이썬 파일을 만들 수 있다.

ps_publisher_emulator.py

import json
import time
from datetime import datetime
from random import random
from google.auth import jwt
from google.cloud import pubsub_v1

# --- Base variables and auth path
CREDENTIALS_PATH = "<GCP 서비스 계정  경로>"
PROJECT_ID = "<프로젝트 ID>"
TOPIC_ID = "<주제 이름>"
MAX_MESSAGES = <Pub/Sub으로 보내고 싶은 데이터 개수>

# --- PubSub Utils Classes
class PubSubPublisher:
    def __init__(self, credentials_path, project_id, topic_id):
        credentials = jwt.Credentials.from_service_account_info(
            json.load(open(credentials_path)),
            audience="https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
        )
        self.project_id = project_id
        self.topic_id = topic_id
        self.publisher = pubsub_v1.PublisherClient(credentials=credentials)
        self.topic_path = self.publisher.topic_path(self.project_id, self.topic_id)

    def publish(self, data: str):
        result = self.publisher.publish(self.topic_path, data.encode("utf-8"))
        return result


# --- Main publishing script
# 설정해준 MAX_MESSAGES 갯수만큼 랜덤하게 메시지가 생성되어서 Pub/Sub으로 보내진다.
def main():
    i = 0
    publisher = PubSubPublisher(CREDENTIALS_PATH, PROJECT_ID, TOPIC_ID)
    while i < MAX_MESSAGES:
        data = {
            "<FLOAT 유형의 컬럼>": random(),
            "<STRING 유형의 컬럼>": f"Hi-{datetime.now()}"
        }
        publisher.publish(json.dumps(data))
        time.sleep(random())
        i += 1

if __name__ == "__main__":
    main()

실행

python ps_publisher_emulator.py
profile
Solutions Architect (rlaalsgud97@gmail.com)

0개의 댓글