[Airflow] Jinja template, python callable template

Woong·2024년 10월 7일
0

Apache Airflow

목록 보기
3/9

Jinja template

  • Jinja template {{ }} 을 통해 런타임에 변수 삽입

    • operator 의 template_fields 속성으로 허용 가능한 리스트 설정
  • ex) BashOperator 를 통해 wikipedia PV 다운로드하기

    • bash_command 로 실행할 bash command 지정
    • Jinja template 을 통해 bash 에 문자열 제공
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator

dag = DAG(
    dag_id="listing_4_01",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@hourly",
)

get_data = BashOperator(
    task_id="get_data",
    bash_command=(
        "curl -o /tmp/wikipageviews.gz "
        "https://dumps.wikimedia.org/other/pageviews/"
        "{{ execution_date.year }}/"
        "{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
        "pageviews-{{ execution_date.year }}"
        "{{ '{:02}'.format(execution_date.month) }}"
        "{{ '{:02}'.format(execution_date.day) }}-"
        "{{ '{:02}'.format(execution_date.hour) }}0000.gz"
    ),
    dag=dag,
)
  • 날짜를 나타내기 위해 Pendulumdatetime 객체 사용
    • 네이티브 Python 의 datetime과 호환
    • ex) execution_date

Task Context variables

  • 주요 task context variable 정리
    • 전체 variable 은 Docs 참고
  • 설정/객체 관련

    • config: Airflow config 접근
    • dag, dag_run: 현재 DAG, DAGRun 객체
    • task: 현재 operator (`PythonOperator)
    • task_instance : 현재 TaskInstance 객체 (TaskInstance)
    • test_mode: test mode 로 실행중인지 여부 (True/False)
  • 시간 관련

    • execution_date: task 실행시간 (Datetime)
      • ds, ds_nodash : %Y-%m-%d, %Y%m%d 포맷의 execution_date 문자열
    • next_execution_date : 다음 스케줄 간격의 execution_date
      • next_ds, next_ds_nodash: %Y-%m-%d, %Y%m%d 포맷 문자열
    • prev_execution_date : 이전 스케줄 간격의 execution_date
      • prev_ds, prev_ds_nodash: %Y-%m-%d, %Y%m%d 포맷 문자열
    • tomorrow_ds, yesterday_ts : 1일 전/후 %Y-%m-%d 포맷 문자열
      • tomorrow_ds_nodash, yesterday_ts_nodash : 1일 전/후 %Y%m%d 포맷 문자열
  • ex) 전체 task context 확인
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="listing_4_03",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
)


def _print_context(**kwargs):
    print(kwargs)


print_context = PythonOperator(
    task_id="print_context", python_callable=_print_context, dag=dag
)

PythonOperator Template

  • PythonOperator 의 경우 python_callable 을 사용하여 별도로 runtime context 적용
    • 위에서 정리한 runtime comtext variable 은 사용하지 않는다.
    • 문자열이 아닌 Python callable 을 제공
      • (BashOperator 는 문자열 제공)
  • ex) PythonOperator 로 wikipedia PV 다운로드하기
    • _get_data 함수를 정의하고, python_callable 인자로 넘겨준다
from urllib import request

import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="listing_4_05",
    start_date=airflow.utils.dates.days_ago(1),
    schedule_interval="@hourly",
)

# callable 정의
def _get_data(execution_date):
    year, month, day, hour, *_ = execution_date.timetuple()
    url = (
        "https://dumps.wikimedia.org/other/pageviews/"
        f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    output_path = "/tmp/wikipageviews.gz"
    request.urlretrieve(url, output_path)


get_data = PythonOperator(task_id="get_data", python_callable=_get_data, dag=dag)
  • **context 로 지정하여 task context 에 접근할 수 있다.
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="listing_4_07",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
)


# **context 로 kwargs 를 지정하면 task context 로 변경
def _print_context(**context):
    print(context)
    # context 를 통해 execution_date 등 접근 가능
    print(context["execution_date"]
    print(context["next_execution_date"])


print_context = PythonOperator(
    task_id="print_context", python_callable=_print_context, dag=dag
)
  • 명시적으로 context variable 을 지정하는 것도 가능
    • 이 경우 **context 에서는 제외됨
def _get_data(execution_date, **context):
    year, month, day, hour, *_ = execution_date.timetuple()

PythonOperator 에서 callable custom variable 제공

  • op_args 사용하는 방식
    • 리스트로 전달, 순서대로 지정

# output_path 를 별도로 받는 callable 정의
def _get_data(output_path, **context):
    pass

get_data = PythonOperator(
    task_id="get_data",
    python_callable=_get_data,
    # op_args 로 variable 제공
    op_args=["/tmp/wikipageviews.gz"],
    dag=dag,
)
  • op_kargs 사용하는 방식
    • 키워드 인수로 전달
# (전략)

def _get_data(year, month, day, hour, output_path, **_):
    url = (
        "https://dumps.wikimedia.org/other/pageviews/"
        f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    request.urlretrieve(url, output_path)


get_data = PythonOperator(
    task_id="get_data",
    python_callable=_get_data,
    # 키워드 인수로 전달
    # callable 에서 datetime 을 추출하는 대신 template 을 통해 전달 가능
    op_kwargs={
        "year": "{{ execution_date.year }}",
        "month": "{{ execution_date.month }}",
        "day": "{{ execution_date.day }}",
        "hour": "{{ execution_date.hour }}",
        "output_path": "/tmp/wikipageviews.gz",
    },
    dag=dag,
)

template variable 검사하기

  • Airflow UI 를 통해 검사하기

    • 각 task instance 별 렌더링된 속성 표시
      • job 을 스케줄해야 검사 가능
  • cli 에서 검사하기

    • job 실행하지 않고 확인 가능
    • 모든 template 속성 확인 가능

airflow tasks render <dag id> <task id> <원하는 execution date>

reference

  • 서적 Apache Airflow 기반의 데이터 파이프라인
  • Github 예제

0개의 댓글