Operator는 airflow가 완료될 task 단위를 정의
모든 Operator는 Base Operator에서 상속, 여기에는 task를 실행하는 데 필요한 모든 인수(ex. logical_date)가 포함됨
각 Operator는 완료 중인 task 유형에 대한 고유한 인수를 포함. 주요 Operator로는 PythonOperator, BashOperator, KubernetesPodOperator 등이 존재
Airflow는 사용자가 Operator에게 전달하는 인수를 기반으로 task를 완료. 이전에 살펴본 tutorial에서는 BashOperator를 이용하여 몇 가지 bash 스크립트를 실행한 것
DAG에서 Operator를 사용하려면 Task로 인스턴스화해야 함
Task는 DAG 컨텍스트 내에서 Operator의 Task를 실행하는 방법을 결정
이전 예제를 살펴보면, 두 개의 별도 bash script를 실행하기 위해 BashOperator를 두 개의 별도 task로 인스턴스화 하는 것을 알 수 있음
이때 'task_id'는 task의 고유 식별자 역할을 함(Web-UI에 표시되는 항목임)
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
# Overriding Default Argument
retries=3,
)
Task Argument의 우선순위
명시적으로 전달된 argument
'default_args' dictionary에 존재하는 값
operator의 default 값(존재하는 경우)
Airflow는 Jinja Template의 기능으로 파이프라인 작성자에게 일련의 기본 제공 매개변수 및 매크로를 제공
자체 매개변수, 메크로 및 템플릿을 정의할 수 있음
아래에서 {{ ds }}는 today's "date stamp"
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
Jinja documentation(Expressions)
Airflow 기본 제공하는 variable과 macro 템플릿
DAG는 Markdown만 지원
Tasks는 일반 텍스트, Markdown, reStructuredText, json 및 yaml을 지원
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
# [END documentation]
t1.set_downstream(t2)
t2.set_upstream(t1)
t1 >> t2
t2 << t1
t1 >> t2 >> t3
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
이 때 순환구조를 가지면 에러 발생
Web-UI의 Graph 항목에서 확인할 수 있음
Script validation
tutorial 코드를 task_id만 tutorial_example로 바꿔 tutorial_example.py로 /usr/airflow/dags 경로에 생성하여 테스트 진행
문법 오류 등의 test를 하는 과정은 단순히 python script를 실행시켜보면됨
docker 내부 python 버전과 다를 것이므로 docker compose 명령을 통해 실행
mount된 도커 내부 경로는 /opt/airflow/dags 임
docker compose run airflow-worker python /opt/airflow/dags/tutorial_example.py
docker compose run airflow-worker airflow db init
docker compose run airflow-worker airflow dags list
docker compose run airflow-worker airflow tasks list tutorial_example
docker compose run airflow-worker airflow tasks list tutorial_example --tree
Testing
airflow tasks test 명령은 작업 인스턴스를 로컬로 실행하고 실행로그를 stdout으로 출력
종속성과 무관하게 상태(실행, 성공, 실패 등)을 데이터베이스에 전달하지 않음
airflow dags test도 마찬가지지만 DAG level에서 작동
특정 날짜의 task instance를 실행시키고 싶을 때 logical date를 지정해서 test할 수도 있음
# command subcommand [dag_id] [task_id] [(optional) date]
# testing print_date
docker compose run airflow-worker airflow tasks test tutorial_example print_date 2022-06-01
# testing sleep
docker compose run airflow-worker airflow tasks test tutorial_example sleep 2022-06-01
# testing templated
docker compose run airflow-worker airflow tasks test tutorial_example templated 2022-06-01
docker compose run airflow-worker airflow dags test tutorial_example 2022-06-01