Jinja template {{ }}
을 통해 런타임에 변수 삽입
template_fields
속성으로 허용 가능한 리스트 설정ex) BashOperator
를 통해 wikipedia PV 다운로드하기
bash_command
로 실행할 bash command 지정import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
dag = DAG(
dag_id="listing_4_01",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@hourly",
)
get_data = BashOperator(
task_id="get_data",
bash_command=(
"curl -o /tmp/wikipageviews.gz "
"https://dumps.wikimedia.org/other/pageviews/"
"{{ execution_date.year }}/"
"{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
"pageviews-{{ execution_date.year }}"
"{{ '{:02}'.format(execution_date.month) }}"
"{{ '{:02}'.format(execution_date.day) }}-"
"{{ '{:02}'.format(execution_date.hour) }}0000.gz"
),
dag=dag,
)
Pendulum
의 datetime
객체 사용datetime
과 호환execution_date
설정/객체 관련
config
: Airflow config 접근dag
, dag_run
: 현재 DAG, DAGRun 객체task
: 현재 operator (`PythonOperator)task_instance
: 현재 TaskInstance 객체 (TaskInstance
)test_mode
: test mode 로 실행중인지 여부 (True
/False
)시간 관련
execution_date
: task 실행시간 (Datetime
)ds
, ds_nodash
: %Y-%m-%d
, %Y%m%d
포맷의 execution_date
문자열next_execution_date
: 다음 스케줄 간격의 execution_date
next_ds
, next_ds_nodash
: %Y-%m-%d
, %Y%m%d
포맷 문자열prev_execution_date
: 이전 스케줄 간격의 execution_date
prev_ds
, prev_ds_nodash
: %Y-%m-%d
, %Y%m%d
포맷 문자열tomorrow_ds
, yesterday_ts
: 1일 전/후 %Y-%m-%d
포맷 문자열tomorrow_ds_nodash
, yesterday_ts_nodash
: 1일 전/후 %Y%m%d
포맷 문자열import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="listing_4_03",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
)
def _print_context(**kwargs):
print(kwargs)
print_context = PythonOperator(
task_id="print_context", python_callable=_print_context, dag=dag
)
PythonOperator
의 경우 python_callable
을 사용하여 별도로 runtime context 적용BashOperator
는 문자열 제공)PythonOperator
로 wikipedia PV 다운로드하기_get_data
함수를 정의하고, python_callable
인자로 넘겨준다from urllib import request
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="listing_4_05",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
)
# callable 정의
def _get_data(execution_date):
year, month, day, hour, *_ = execution_date.timetuple()
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
output_path = "/tmp/wikipageviews.gz"
request.urlretrieve(url, output_path)
get_data = PythonOperator(task_id="get_data", python_callable=_get_data, dag=dag)
**context
로 지정하여 task context
에 접근할 수 있다.import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="listing_4_07",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
)
# **context 로 kwargs 를 지정하면 task context 로 변경
def _print_context(**context):
print(context)
# context 를 통해 execution_date 등 접근 가능
print(context["execution_date"]
print(context["next_execution_date"])
print_context = PythonOperator(
task_id="print_context", python_callable=_print_context, dag=dag
)
**context
에서는 제외됨def _get_data(execution_date, **context):
year, month, day, hour, *_ = execution_date.timetuple()
op_args
사용하는 방식
# output_path 를 별도로 받는 callable 정의
def _get_data(output_path, **context):
pass
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
# op_args 로 variable 제공
op_args=["/tmp/wikipageviews.gz"],
dag=dag,
)
op_kargs
사용하는 방식# (전략)
def _get_data(year, month, day, hour, output_path, **_):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
# 키워드 인수로 전달
# callable 에서 datetime 을 추출하는 대신 template 을 통해 전달 가능
op_kwargs={
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour }}",
"output_path": "/tmp/wikipageviews.gz",
},
dag=dag,
)
Airflow UI 를 통해 검사하기
cli 에서 검사하기
airflow tasks render <dag id> <task id> <원하는 execution date>