airflow에서 DAG를 구성하다보면 dag가실행중 에러가 나면 그다음 dag run을 실행하지 않도록 구성하고 싶을 때가 있다
가장 간단한 방법은 태스크에 depends_on_past , wait_for_downstream 설정을 True로 넣어주는 방법이다
args = {
'owner': 'my',
'depends_on_past': True, # 이전 dag run에서 현재 task가 실패했으면 실행하지 않음
'wait_for_downstream': True # 이전 dag run에서 현재 task의 downstream이 실패했으면 실행하지 않음
}
dag = DAG(
dag_id='DAG ID',
default_args=args,
start_date=days_ago(1),
schedule_interval=timedelta(minutes=30),
max_active_runs=1
)
하지만 이 방법은 task의 바로 아래에 붙어있는(downstream)이 실패했을 때만 그 task가 실행되지 않으므로 일단 dag가 실행은 되게 된다
그래서 dag 전체를 실행하지 않으려면 ExternalTaskSensor 를 사용해야 한다.
DAG의 가장 앞에 ExternalTaskSensor 를 만들고 DAG의 마지막에 dummy_operator 를 하나 만든다음 sensor에서 이 마지막 dummy_operator 의 성공여부를 센싱하도록 하면 앞의 dag run이 성공 했을 때만 실행하도록 할 수 있다
DAG의 catchup 설정에 따라 사용법이 조금 달라진다
start_sensor = ExternalTaskSensor(
task_id='start_sensor',
external_dag_id=dag.dag_id,
external_task_id='last_operator',
allowed_states=[State.SUCCESS],
execution_delta=dag.schedule_interval
)
ExternalTaskSensor 는 센싱할 task의 execution_date 를 현재 task와 동일하게 맞춰줘야하지만 execution_delta 또는 execution_date_fn 값을 이용하여 센싱할 task의 execution_date를 조절한다
exexcution_delta 는 센싱할 task와 현재 task의 excecution_date 의 차이를 의미하고 우리는 이전에 실행된 dag run의 task가 필요하므로 dag.schedule_interval 로 넣어주면 정상적으로 이전 task를 찾아 센싱하게 된다
이렇게 설정을 넣어준 후 직접 marked as success를 수동으로 해주면 그 다음 run부터 정상적으로 실행하게 된다
start_sensor = ExternalTaskSensor(
task_id='start_sensor',
external_dag_id=dag.dag_id,
external_task_id='last_operator',
allowed_states=[State.SUCCESS],
execution_date_fn=lambda x:dag.get_dagrun(x).get_previous_dagrun().execution_date
)
catchup = False이면 이전에 실행된 dag run이 schedule_interval만큼 이전에 실행되었다는 보장이 없다
따라서 이전 dag run을 찾고 거기에서 execution_date를 찾아와야한다
execution_delta 대신 execution_date_fn을 사용했고 여기서 이전 dag run의 execution_date를 가져와주면 된다
이 때도 최초실행 시 marked as success 한번 해주면 된다
catchup=True 일 때도 똑같이 설정해줘도 된다