Airflow DAG 살펴보기

Alan·2023년 3월 23일
0

Airflow 맛보기

목록 보기
4/7

Instantiate a DAG

  • Task를 저장할 DAG 개체가 필요

  • 여기서는 DAG의 고유 식별자 역할을 하는 dag_id를 정의하는 문자열을 전달

  • 예제에서는 default args를 전달하고 DAG에 대한 1일 간격의 schedule을 정의


with DAG(
	# DAG를 식별하는 DAG_ID 전달
    "tutorial",
	# default args 전달
	default_args={
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function,
        # 'on_success_callback': some_other_function,
        # 'on_retry_callback': another_function,
        # 'sla_miss_callback': yet_another_function,
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

DAG 인식

  • Airflow는 하나의 DAG에 대한 identity를 dag_id와 start_date로 구분

  • 실제 MetaDB의 unique key는 dag_id이지만, 한 번 로드된(실행된) start_date는 변경할 수 없음

  • 스케줄에 대한 상세 조건은 변경 가능함

  • 따라서 start_date를 지정하기 전에 airflow test나 backfill API로 부분적으로 수행 테스트를 한 뒤에 최종 start_date를 정하는 것이 좋음

  • start_date를 변경해야한다면, dag_id를 변경해서 새로운 dag로 정의해야 함

Sechdule

  • timedelta, cron expresstion, cron preset 이용 가능

  • cron preset은 @daily처럼 human-readable하게 cron expression을 쓸 수 있게 해 놓은 것임

start_date, end_date, logical_date(execution_date)

  • star_date : DAG의 스케줄링을 시작하는 날짜. 해당 날짜부터 schedul에 선언된 간격으로 DAG를 trigger

  • end_date : DAG의 스케줄링을 종료하는 날짜. 해당 날짜부터 해당 DAG는 스케줄링하지 않음

  • logical_date(execution_date) : DAG가 실제로 trigger된 개별 시간. backfill로 수행하더라도 항상 같은 시간을 유지함

    • backfill : 이전 날짜에 수행된 workflow를 다시 수행하는 것

    • 실제 task 안에서 {{ds}}, {{ts}} 등의 변수로 활용

  • Start Date(in task 수행결과 log) : 실제 worker에서 수행을 시작한 시간

  • End Date(in task 수행결과 log) : 실제 worker에서 수행이 끝난 시간

  • arguments의 catchup

    • True로 설정하면, 설정된 start_date부터 현재까지의 Task를 일자별로 수행하여 과거 데이터를 채움

0개의 댓글