[Airflow] dag실행 중 에러가 발생했을 때 다음 dag run을 실행하지 않기

owlur·2021년 8월 3일
2

airflow

목록 보기
1/2

airflow에서 DAG를 구성하다보면 dag가실행중 에러가 나면 그다음 dag run을 실행하지 않도록 구성하고 싶을 때가 있다

가장 간단한 방법은 태스크에 depends_on_past , wait_for_downstream 설정을 True로 넣어주는 방법이다

args = {
    'owner': 'my',
    'depends_on_past': True,  # 이전 dag run에서 현재 task가 실패했으면 실행하지 않음
    'wait_for_downstream': True # 이전 dag run에서 현재 task의 downstream이 실패했으면 실행하지 않음
}

dag = DAG(
        dag_id='DAG ID',
        default_args=args,
        start_date=days_ago(1),
        schedule_interval=timedelta(minutes=30),
        max_active_runs=1
)

하지만 이 방법은 task의 바로 아래에 붙어있는(downstream)이 실패했을 때만 그 task가 실행되지 않으므로 일단 dag가 실행은 되게 된다

그래서 dag 전체를 실행하지 않으려면 ExternalTaskSensor 를 사용해야 한다.

DAG의 가장 앞에 ExternalTaskSensor 를 만들고 DAG의 마지막에 dummy_operator 를 하나 만든다음 sensor에서 이 마지막 dummy_operator 의 성공여부를 센싱하도록 하면 앞의 dag run이 성공 했을 때만 실행하도록 할 수 있다

DAG의 catchup 설정에 따라 사용법이 조금 달라진다

catchup = True 일 때

    start_sensor = ExternalTaskSensor(
        task_id='start_sensor',
        external_dag_id=dag.dag_id,
        external_task_id='last_operator',
        allowed_states=[State.SUCCESS],
        execution_delta=dag.schedule_interval
    )

ExternalTaskSensor 는 센싱할 task의 execution_date 를 현재 task와 동일하게 맞춰줘야하지만 execution_delta 또는 execution_date_fn 값을 이용하여 센싱할 task의 execution_date를 조절한다

exexcution_delta 는 센싱할 task와 현재 task의 excecution_date 의 차이를 의미하고 우리는 이전에 실행된 dag run의 task가 필요하므로 dag.schedule_interval 로 넣어주면 정상적으로 이전 task를 찾아 센싱하게 된다

이렇게 설정을 넣어준 후 직접 marked as success를 수동으로 해주면 그 다음 run부터 정상적으로 실행하게 된다

catchup = False 일 때

    start_sensor = ExternalTaskSensor(
        task_id='start_sensor',
        external_dag_id=dag.dag_id,
        external_task_id='last_operator',
        allowed_states=[State.SUCCESS],
        execution_date_fn=lambda x:dag.get_dagrun(x).get_previous_dagrun().execution_date
    )

catchup = False이면 이전에 실행된 dag run이 schedule_interval만큼 이전에 실행되었다는 보장이 없다

따라서 이전 dag run을 찾고 거기에서 execution_date를 찾아와야한다

execution_delta 대신 execution_date_fn을 사용했고 여기서 이전 dag run의 execution_date를 가져와주면 된다

이 때도 최초실행 시 marked as success 한번 해주면 된다

catchup=True 일 때도 똑같이 설정해줘도 된다

profile
개발자

0개의 댓글