Jinja template {{ }} 을 통해 런타임에 변수 삽입
template_fields 속성으로 허용 가능한 리스트 설정ex) BashOperator 를 통해 wikipedia PV 다운로드하기
bash_command 로 실행할 bash command 지정import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
dag = DAG(
dag_id="listing_4_01",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@hourly",
)
get_data = BashOperator(
task_id="get_data",
bash_command=(
"curl -o /tmp/wikipageviews.gz "
"https://dumps.wikimedia.org/other/pageviews/"
"{{ execution_date.year }}/"
"{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
"pageviews-{{ execution_date.year }}"
"{{ '{:02}'.format(execution_date.month) }}"
"{{ '{:02}'.format(execution_date.day) }}-"
"{{ '{:02}'.format(execution_date.hour) }}0000.gz"
),
dag=dag,
)
Pendulum 의 datetime 객체 사용datetime과 호환execution_date 설정/객체 관련
config: Airflow config 접근dag, dag_run: 현재 DAG, DAGRun 객체task: 현재 operator (`PythonOperator)task_instance : 현재 TaskInstance 객체 (TaskInstance)test_mode: test mode 로 실행중인지 여부 (True/False)시간 관련
execution_date: task 실행시간 (Datetime)ds, ds_nodash : %Y-%m-%d, %Y%m%d 포맷의 execution_date 문자열next_execution_date : 다음 스케줄 간격의 execution_datenext_ds, next_ds_nodash: %Y-%m-%d, %Y%m%d 포맷 문자열prev_execution_date : 이전 스케줄 간격의 execution_dateprev_ds, prev_ds_nodash: %Y-%m-%d, %Y%m%d 포맷 문자열tomorrow_ds, yesterday_ts : 1일 전/후 %Y-%m-%d 포맷 문자열tomorrow_ds_nodash, yesterday_ts_nodash : 1일 전/후 %Y%m%d 포맷 문자열import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="listing_4_03",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
)
def _print_context(**kwargs):
print(kwargs)
print_context = PythonOperator(
task_id="print_context", python_callable=_print_context, dag=dag
)
PythonOperator 의 경우 python_callable 을 사용하여 별도로 runtime context 적용BashOperator 는 문자열 제공)PythonOperator 로 wikipedia PV 다운로드하기_get_data 함수를 정의하고, python_callable 인자로 넘겨준다from urllib import request
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="listing_4_05",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
)
# callable 정의
def _get_data(execution_date):
year, month, day, hour, *_ = execution_date.timetuple()
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
output_path = "/tmp/wikipageviews.gz"
request.urlretrieve(url, output_path)
get_data = PythonOperator(task_id="get_data", python_callable=_get_data, dag=dag)
**context 로 지정하여 task context 에 접근할 수 있다.import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="listing_4_07",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
)
# **context 로 kwargs 를 지정하면 task context 로 변경
def _print_context(**context):
print(context)
# context 를 통해 execution_date 등 접근 가능
print(context["execution_date"]
print(context["next_execution_date"])
print_context = PythonOperator(
task_id="print_context", python_callable=_print_context, dag=dag
)
**context 에서는 제외됨def _get_data(execution_date, **context):
year, month, day, hour, *_ = execution_date.timetuple()
op_args 사용하는 방식
# output_path 를 별도로 받는 callable 정의
def _get_data(output_path, **context):
pass
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
# op_args 로 variable 제공
op_args=["/tmp/wikipageviews.gz"],
dag=dag,
)
op_kargs 사용하는 방식# (전략)
def _get_data(year, month, day, hour, output_path, **_):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
# 키워드 인수로 전달
# callable 에서 datetime 을 추출하는 대신 template 을 통해 전달 가능
op_kwargs={
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour }}",
"output_path": "/tmp/wikipageviews.gz",
},
dag=dag,
)
Airflow UI 를 통해 검사하기
cli 에서 검사하기
airflow tasks render <dag id> <task id> <원하는 execution date>