Task를 저장할 DAG 개체가 필요
여기서는 DAG의 고유 식별자 역할을 하는 dag_id를 정의하는 문자열을 전달
예제에서는 default args를 전달하고 DAG에 대한 1일 간격의 schedule을 정의
with DAG(
# DAG를 식별하는 DAG_ID 전달
"tutorial",
# default args 전달
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
Airflow는 하나의 DAG에 대한 identity를 dag_id와 start_date로 구분
실제 MetaDB의 unique key는 dag_id이지만, 한 번 로드된(실행된) start_date는 변경할 수 없음
스케줄에 대한 상세 조건은 변경 가능함
따라서 start_date를 지정하기 전에 airflow test나 backfill API로 부분적으로 수행 테스트를 한 뒤에 최종 start_date를 정하는 것이 좋음
start_date를 변경해야한다면, dag_id를 변경해서 새로운 dag로 정의해야 함
timedelta, cron expresstion, cron preset 이용 가능
cron preset은 @daily처럼 human-readable하게 cron expression을 쓸 수 있게 해 놓은 것임
star_date : DAG의 스케줄링을 시작하는 날짜. 해당 날짜부터 schedul에 선언된 간격으로 DAG를 trigger
end_date : DAG의 스케줄링을 종료하는 날짜. 해당 날짜부터 해당 DAG는 스케줄링하지 않음
logical_date(execution_date) : DAG가 실제로 trigger된 개별 시간. backfill로 수행하더라도 항상 같은 시간을 유지함
backfill : 이전 날짜에 수행된 workflow를 다시 수행하는 것
실제 task 안에서 {{ds}}, {{ts}} 등의 변수로 활용
Start Date(in task 수행결과 log) : 실제 worker에서 수행을 시작한 시간
End Date(in task 수행결과 log) : 실제 worker에서 수행이 끝난 시간
arguments의 catchup