[Airflow] dag 실행시 argument를 전달하여 실행하는 방법

v K_Y v·2023년 2월 7일
0

AIRFLOW

목록 보기
1/4
post-thumbnail

airflow를 사용하다보면 개발/운영 환경의 분리나 초기 적재/데일리 적재 등 옵션에 따라 다른 프로세스를 수행해야 할 때가 있다. 그때마다 dag나 task를 만들거나 직접 소스 내, config 파일을 바꾸는 것은 너무 비효율적이다. 이런 상황에서 사용할 수 있는 TIP이 있어 정리하고자 한다.

Trigger Dag w/ config

@dag(
    default_args=args,
    description="파이프라인",
    schedule_interval="30 3 * * *",
    catchup=False,
    tags=["data"],
    params={
        "mode" : "daily",
        "env" : "dev"
    }
)
def test_dag():
  ...

dag 정의 시, params라는 옵션을 입력한다. JSON형태로 key-value는 사용자가 정의 가능함.

  • UI에서 Trigger DAG w/ config를 클릭하면 dag 내에서 지정한 params를 볼 수 있음
  • params를 바꿔서 실행해야 한다면 화면에서 params를 바꾼 후 Trigger 버튼을 클릭하면 트리거 시점에 params를 변경할 수 있음
  • 스케줄링 실행이나 기본 트리거 시에는 dag 내에 정의한 params가 반영됩니다.
airflow dags trigger --conf '{"mode":"dev", "env":"dev"}' test_dag
  • CLI를 통해서 트리거 시에는 이와 같이 작성

params 사용법

위와 같이 지정한 params는 operator마다 사용방법이 다름
1. bash operator

test_operator = BashOperator(
        task_id="test",
        bash_command="bash " + source_path + "/src/script/test.sh {{ params.env }}"
)
  • bash operator에서는 Jinja template 형태로 사용
  1. python operator
def choose_mode(**context):
    mode = context['params']['mode']
    if mode == 'init':
        target = 'init_generate_data'
    else:
        target = 'daily_generate_data'
    return target
    
task_branch = BranchPythonOperator(
        task_id='task_branch',
        python_callable=choose_mode
)
  • python operator에서는 context를 이용하여 전달받은 params를 읽을 수 있음
profile
📌 기억하기 위해 남기는 기록들

0개의 댓글