[airflow] dag 생성

eve·2022년 12월 4일
0

airflow

목록 보기
2/4
  1. 기본 폼

from airflow import DAG

with DAG() as dag:

dag라는 객체는 데이터 파이프라인 그 자체이다. 'DAG()' 안의 파라미터 값들을 조정해서 dag를 생성한다.



  1. 첫번째 파라미터 : 'dag id'
with DAG('forex_data_pipeline') as dag:

dag id는 각각의 dag가 가진 유일한 id값을 의미한다.모든 dag id 중 겹치는 id값이 전혀 없어야 한다.



  1. 두번째 파라미터 : 'start_date'
from datetime import datetime

with DAG('forex_data_pipeline', start_date=datetime(2022, 12, 4)) as dag

scheduling을 멈추는 날짜를 지정하는 파라미터로, 'datetime'이라는 모듈을 import해야 사용할 수 있다.



  1. 세 번째 파라미터: 'schedule_interval'
with DAG('forex_data_pipeline', start_date=datetime(2022, 12, 4), schedul_interval="@daily) as dag

dag 실행의 빈도수를 지정하는 파라미터이다. @daily라고 지정하게 되면 매일 밤 자정에 dag가 실행되게 된다.



  1. 다섯 번째 파라미터: default_args

from datetime import datetime, timedelta

default_args = {
	"owner": "airflow",
    "email_on_failure": False,
    "email_on_retry": False,
    "email": "admin@localhost.com",
    "retries": 1,
	"retry_delay": timedelta(minutes=5)
}

dag에 할당할 디테일한 인자들을 지정한다.

  • 소유자
  • 이메일 로그인 실패할 경우 반응
  • 실패 시 airflow의 재시도 여부
  • 이메일 지정 + 접근 허용할 이메일 주소
  • task 재시도 가능 횟수
  • task 실패 시, 대기 시간 지정

timedelta를 import해야 사용할 수 있으며, DAG() 함수 안의 마지막 파라미터로 지정해주어야 한다.

with DAG('forex_data_pipeline', start_date=datetime(2022, 12, 4), schedul_interval="@daily, default_args) as dag
profile
유저가 왜 그랬을까

0개의 댓글