schedule_interval
로 DAG 를 일정 시간 간격마다 실행하도록 설정@once
: 1회만 실행@hourly
: 매시 정각@daily
: 매일 자정@weekly
: 매주 일요일 자정@monthly
: 매월 1일 자정@yearly
: 매년 1월 1일 자정dag = DAG(
dag_id="06_templated_query",
schedule_interval="@daily",
start_date=dt.datetime(year=2019, month=1, day=1),
end_date=dt.datetime(year=2019, month=1, day=5),
)
특정 빈도마다 스케줄 정의 불가
crontab
과 동일한 형태로 스케줄 간격 지정0 0 * * MON, WED, FRI
: 월/수/금 자정timedelta
로 특정 시간 간격으로 설정days
, minutes
, hours
등
dag=DAG(
dag_id="test",
schedule_interval=dt.timedelta(days=3),
...
)
시간 기반 프로세스 워크플로의 경우 작업이 실행되는 시간 간격
을 아는 것이 중요 -> 추가 매개변수 제공
Execution_date(실행날짜)는 스케줄 간격의 실행 시작 시간 (DAG 시작 시간 X)
위 두 매개변수(next_execution_date, previous_execution_date)는 스케줄 간격 이후의 DAG 실행을 통해서만 정의되므로, UI나 CLI로 수동으로 실행 시 매개변수 값이 정의 되지 않아 사용할 수 없음
"start_date={{execution_date.strftime('%Y-%m-%d')}}"
"&end_date={{next_execution_date.strftime('%Y-%m-%d')}}"
"start_date={{ds}}"
"&end_date={{next_ds}"
축약 매개변수 제공
ds
, ds_nodash
: 스케줄 간격의 시작 시간 (YYYY-MM-DD
, YYYYMMDD
)next_ds
, next_ds_nodash
: 스케줄 간격의 종료 시간 (YYYY-MM-DD
,YYYYMMDD
)prev_ds
, prev_ds_nodash
: 과거 스케줄 간격의 시간 (YYYY-MM-DD
, YYYYMMDD
)def _cal_stats(**context):
"""Calculates event statistics"""
input_path = context["templates_dict"]["input_path"]
output_path = context["templates_dict"]["output_path"]
... (생략)
cal_stats = PythonOperator(
task_id="cal_stats",
python_callable=_cal_stats,
templates_dict={
"input_path" = "/data/events/{{ds}}.json", # 템플릿되는 값 전달
"output_path" = "/date/events/{{ds}}.csv"
},
dag=dag,
}
Backfill
: 과거 특정 기간 동안 실행하지 못한 태스크나 DAG 실행을 다시 수행하는 작업
DAG의 catchup 매개변수를 true가 디폴트 값으로, 과거의 스케줄 간격을 포함하여 backfill 실행
catchup false 로 설정하여 비활성화 가능
모든 것이 완료되거나 실패
성공적으로 수행하여 적절한 결과 생성 or 시스템에 영향 주지 않고 실패
원자성 유지하기 위해 다수의 task로 분리
동일한 입력으로 동일한 태스크 여러번 호출해도 결과는 동일
* 일관성과 장애 처리를 보장
멱등성 태스크 : 실행 날짜별({{ds}}
) 각 json파일 존재하여 다시 실행해도 결과 동일
비멱등성 태스크 : 단일 파일에 이벤트 추가되는 경우 태스크 재실행하면 결과에 이벤트가 추가됨