[Airflow] scheduling

Woong·2024년 11월 12일
0

Apache Airflow

목록 보기
2/9

schedule_interval

  • schedule_interval 로 DAG 를 일정 시간 간격마다 실행하도록 설정
    • @once: 1회만 실행
    • @hourly: 매시 정각
    • @daily: 매일 자정
    • @weekly: 매주 일요일 자정
    • @monthly: 매월 1일 자정
    • @yearly: 매년 1월 1일 자정
dag = DAG(
    dag_id="06_templated_query",
    schedule_interval="@daily",
    start_date=dt.datetime(year=2019, month=1, day=1),
    end_date=dt.datetime(year=2019, month=1, day=5),
)

cron 기반 스케줄 간격

특정 빈도마다 스케줄 정의 불가

  • crontab 과 동일한 형태로 스케줄 간격 지정
    ex) 0 0 * * MON, WED, FRI : 월/수/금 자정

빈도 기반 스케줄 간격

  • timedelta로 특정 시간 간격으로 설정
    • days, minutes, hours

dag=DAG(
  dag_id="test",
  schedule_interval=dt.timedelta(days=3), 
  ...
)

실행 날짜로 동적 시간 참조

시간 기반 프로세스 워크플로의 경우 작업이 실행되는 시간 간격을 아는 것이 중요 -> 추가 매개변수 제공

  • execution_date: 해당 스케줄 간격의 시작 시간
  • next_execution_date: 스케줄 간격의 종료 시간
  • previous_execution_date: 과거 스케줄 간격의 시간

Execution_date(실행날짜)는 스케줄 간격의 실행 시작 시간 (DAG 시작 시간 X)

위 두 매개변수(next_execution_date, previous_execution_date)는 스케줄 간격 이후의 DAG 실행을 통해서만 정의되므로, UI나 CLI로 수동으로 실행 시 매개변수 값이 정의 되지 않아 사용할 수 없음

특정 날짜 지정 및 jinja 템플릿 활용

"start_date={{execution_date.strftime('%Y-%m-%d')}}" 
"&end_date={{next_execution_date.strftime('%Y-%m-%d')}}"

"start_date={{ds}}" 
"&end_date={{next_ds}"

축약 매개변수 제공

  • ds, ds_nodash: 스케줄 간격의 시작 시간 (YYYY-MM-DD, YYYYMMDD)
  • next_ds, next_ds_nodash : 스케줄 간격의 종료 시간 (YYYY-MM-DD,YYYYMMDD)
  • prev_ds, prev_ds_nodash: 과거 스케줄 간격의 시간 (YYYY-MM-DD, YYYYMMDD)

데이터 파티셔닝

def _cal_stats(**context): 
   """Calculates event statistics"""
   input_path = context["templates_dict"]["input_path"] 
   output_path = context["templates_dict"]["output_path"] 
... (생략) 

cal_stats = PythonOperator( 
   task_id="cal_stats", 
   python_callable=_cal_stats, 
   templates_dict={ 
      "input_path" = "/data/events/{{ds}}.json", # 템플릿되는 값 전달
      "output_path" = "/date/events/{{ds}}.csv" 
   },
   dag=dag,
}

Backfill

  • Backfill: 과거 특정 기간 동안 실행하지 못한 태스크나 DAG 실행을 다시 수행하는 작업

  • DAG의 catchup 매개변수를 true가 디폴트 값으로, 과거의 스케줄 간격을 포함하여 backfill 실행

  • catchup false 로 설정하여 비활성화 가능

태스크 디자인 모범사례

원자성 (Atomicity)

모든 것이 완료되거나 실패

  • 성공적으로 수행하여 적절한 결과 생성 or 시스템에 영향 주지 않고 실패

  • 원자성 유지하기 위해 다수의 task로 분리

    • task 사이에 강한 의존성이 있을 경우에는 하나의 일관된 task 단위 형성이 더 나음
    • (e.g. api 호출 전 로그인이 필요한 경우)

2. 멱등성 (Idempotency)

  • 동일한 입력으로 동일한 태스크 여러번 호출해도 결과는 동일
    일관성과 장애 처리를 보장

  • 멱등성 태스크 : 실행 날짜별({{ds}}) 각 json파일 존재하여 다시 실행해도 결과 동일

  • 비멱등성 태스크 : 단일 파일에 이벤트 추가되는 경우 태스크 재실행하면 결과에 이벤트가 추가됨

reference

  • 서적 'Apache Airflow 기반의 데이터 파이프라인'

0개의 댓글