Airflow Python 스크립트가 실제로는
DAG의 구조를 코드로 지정하는 구성 파일일 뿐
DAG 정의 파일을 실제 데이터 처리를 수행할 수 있는 장소로 생각하지만
전혀 그렇지 않습니다.
import textwrap
from datetime import datetime, timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
Direct Acycled Graph
- 관계와 의존성(Dependencies)을 가진 작업(Task)들의 집합
DAG안에 들어갈 요소들을 Key-Value 형식으로 지정해 넣을 수 있다.
# 이 인수들은 각 연산자에 전달된다.
# 연산자 초기화 중에 Task별로 이를 재정의할 수 있다.
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'trigger_rule': 'all_success'
},
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
Operator
는 Airflow가 완료할 작업 단위를 정의
즉, 특정 행위를 할 수 있는 기능을 모아 놓은 클래스 - 설계도
Operator를 사용하는 것은,
DAG 코드로 Task 의존성을 시각화하는 것을 돕는다.
PythonOperator
,BashOperator
,KubernetesPodOperator
가 매우 많이 사용된다.
Operator에서 객체화되어 DAG에서 실행 가능한 오브젝트
DAG에서 Operator를 사용하기 위해,
Task
로 인스턴스화 필요
Task가 DAG에서 Operator가 어떻게 실행될 지 결정한다.
t1.set_downstream(t2)
# 이것은 t2가 t1이 성공적으로 실행되어야 실행됨을 의미합니다.
# 다음과 동일합니다:
t2.set_upstream(t1)
# 비트 시프트 연산자는 작업을 연쇄시키는 데 사용할 수 있습니다:
t1 >> t2
# 그리고 비트 시프트 연산자로 상위 의존성을 설정합니다:
t2 << t1
# 여러 종속성을 연결하는 것은 비트 시프트 연산자로 간결해집니다:
t1 >> t2 >> t3
# 작업 목록을 종속성으로 설정할 수도 있습니다. 이러한 작업들은 동일한 효과를 갖습니다:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1