[Data Pipelines] Airflow 콘텍스트를 사용하여 태스크 템플릿 작업하기

jake·2022년 8월 24일
0

Airflow로 처리할 데이터 검사하기

증분 데이터를 적재하는 방법 결정하기

어떤 종류의 데이터로 작업을 하든지 zip파일 다운로드, 압축풀기, 데이터 추출
이 과정은 필수적인 사항이다.
크고 작은 모든 데이터는 구조가 복잡할 수 있으며 파이프라인을 구축하기 전에 접근 방식에 대한 기술적 계획을 세우는 것이 중요하다.
데이터 파이프라인을 개발하기 위해서는 데이터를 증분 방식으로 적재하는 방법과 데이터를 다루는 방법을 이해해야 한다.



태스크 콘텍스트와 Jinja 템플릿 작업

위키피디아 페이지 뷰 수를 가져오는 DAG의 첫 번째 버전을 만들어 보자.
데이터를 다운로드하고, 추출하고, 읽는 것으로 먼저 시작한다.

첫 번째 단계는 주기마다 압축 파일을 다운로드하는 것이다.
URL은 다양한 날짜 및 시간 구성 요소로 구성된다.
http://dumps.wikimedia.org/other/pageviews/{year}/{year}-{month}/pageviews-{year}{month}{day}{hour}0000.gz

모든 주기에 대해 URL에 특정 주기의 날짜와 시간을 입력해야 한다.
페이지 뷰를 다운로드하는 방법에는 여러 가지가 있는데 지금은 BashOperator와 PythonOperator를 사용하자.

오퍼레이터의 인수 템플릿 작업

먼저 BashOperator를 사용하여 위키피디아 페이지 뷰를 다운로드하자.

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator

dag = DAG(
    dag_id="listing_4_01",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@hourly",
)

get_data = BashOperator(
    task_id="get_data",
    bash_command=(
        "curl -o /tmp/wikipageviews.gz "
        "https://dumps.wikimedia.org/other/pageviews/"
        "{{ execution_date.year }}/" #이중 중괄호는 런타임 시에 삽입될 변수를 나타낸다.
        "{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
        "pageviews-{{ execution_date.year }}"
        "{{ '{:02}'.format(execution_date.month) }}"
        "{{ '{:02}'.format(execution_date.day) }}-"
        "{{ '{:02}'.format(execution_date.hour) }}0000.gz" # 모든 파이썬 변수 또는 표현식에 대해 제공할 수 있다.
    ),
    dag=dag,
)

execution_date는 작업 런타임 시에 마법처럼 사용할 수 있는 변수 중 하나이다.
이중 중괄호는 Jinja 템플릿 문자열을 나타내고 Jinja는 런타임 시에 템플릿 문자열의 변수와 and 및 or 표현식을 대체하는 템플릿 엔진이다.
템플릿 작성은 프로그래머로서 코드 작성 시점에는 값을 알기 어렵지만 런타임 시에 값을 할당하기 위해 사용한다.

사용자가 런타임 시에 값을 입력하기 때문에 프로그래밍할 때에는 값을 알 수 없다. 우리가 아는 것은 삽입된 값이 name이라는 변수에 할당된 다음, 런타임 시에 값을 할당하고 삽입 하기 위해 템플릿 문자열을 제공한다는 것이다.

 

템플릿에 무엇이 사용 가능할까

템플릿화를 위해 사용할 수 있는 변수는 무엇이 있을까.
이전에 사용한 execution_date뿐만 아니라 더 많은 변수를 사용할 수 있다.
PythonOperator의 도움으로 전체 태스크 콘텍스트를 출력하여 검사할 수 있다.

여기서 정리하기에는 너무 많으니 알아서 찾아보자.

 

PythonOperator 템플릿

PythonOperator는 이전에 설명한 BashOperator와는 차이가 있다.
BashOperator를 사용하여 런타임에 자동으로 템플릿이 지정되는 bash_commad 인수에 문자열을 제공한다.
PythonOperator는 런타임 콘텍스트로 템플릿화할 수 있는 인수를 사용치 않고 별도로 런타임 콘텍스트를 적용할 수 있는 python_collable 인수를 사용하기 때문에 이 표준을 따르지 않는다.

이전에 사용한 위키피디아 페이지 뷰를 다운로드하는 코드를 PythonOperator로 구현된 코드를 확인해보자. 기능적으로 동일한 동작을 수행한다.

from urllib import request

import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="listing_4_05",
    start_date=airflow.utils.dates.days_ago(1),
    schedule_interval="@hourly",
)


def _get_data(execution_date):
    year, month, day, hour, *_ = execution_date.timetuple()
    url = (
        "https://dumps.wikimedia.org/other/pageviews/"
        f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    output_path = "/tmp/wikipageviews.gz"
    request.urlretrieve(url, output_path)


get_data = PythonOperator(task_id="get_data", python_callable=_get_data, dag=dag)

파이썬에서 함수는 주요 요소이며, PythonOperator의 python_collable 인수에 callable을 제공한다.
어떤 함수라도 실행 시 PythonOperator는 호출 가능하도록 수행한다.

 

파이썬은 함수에서 키워드 인수를 받을 수 있는데 키워드 인수를 사전에 알지 못하는 경우, 예상되는 키워드 인수를 모두 명시적으로 작성할 필요 없는 다양한 사례가 있다.

def _print_context(**kwargs): # 키워드 인수는 두 개의 애스터리스크(**)로 표시하면 캡처된다.
                              # 그리고 캡처 인수의 이름을 kwrgs에 저장한다.
    print(kwargs)

키워드 인수의 이름 지정은 Airflow 태스크 콘텍스트 변수를 캡쳐할 의도를 나타내기 위해서 적절한 이름을 사용하는 것이 좋다.

import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="listing_4_07",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
)


def _print_context(**context):
    print(context)


print_context = PythonOperator(
    task_id="print_context", python_callable=_print_context, dag=dag
)

콘텍스트 변수는 모든 콘텍스트 변수의 집합이며 현재 실행되는 태스크의 시작 및 종료 날짜 시간 인쇄와 같이 태스크 실행 간격에 대해 다양한 동작을 제공할 수 있다.

0개의 댓글