Airflow Operator 살펴보기

Alan·2023년 3월 23일
0

Airflow 맛보기

목록 보기
5/7

Operators

  • Operator는 airflow가 완료될 task 단위를 정의

  • 모든 Operator는 Base Operator에서 상속, 여기에는 task를 실행하는 데 필요한 모든 인수(ex. logical_date)가 포함됨

  • 각 Operator는 완료 중인 task 유형에 대한 고유한 인수를 포함. 주요 Operator로는 PythonOperator, BashOperator, KubernetesPodOperator 등이 존재

  • Airflow는 사용자가 Operator에게 전달하는 인수를 기반으로 task를 완료. 이전에 살펴본 tutorial에서는 BashOperator를 이용하여 몇 가지 bash 스크립트를 실행한 것

Tasks

  • DAG에서 Operator를 사용하려면 Task로 인스턴스화해야 함

  • Task는 DAG 컨텍스트 내에서 Operator의 Task를 실행하는 방법을 결정

  • 이전 예제를 살펴보면, 두 개의 별도 bash script를 실행하기 위해 BashOperator를 두 개의 별도 task로 인스턴스화 하는 것을 알 수 있음

  • 이때 'task_id'는 task의 고유 식별자 역할을 함(Web-UI에 표시되는 항목임)

t1 = BashOperator(
    task_id="print_date",
    bash_command="date",
)

t2 = BashOperator(
    task_id="sleep",
    depends_on_past=False,
    bash_command="sleep 5",
    # Overriding Default Argument
    retries=3,
)
  • Task Argument의 우선순위

    • 명시적으로 전달된 argument

    • 'default_args' dictionary에 존재하는 값

    • operator의 default 값(존재하는 경우)

Jinja Template

  • Airflow는 Jinja Template의 기능으로 파이프라인 작성자에게 일련의 기본 제공 매개변수 및 매크로를 제공

  • 자체 매개변수, 메크로 및 템플릿을 정의할 수 있음

  • 아래에서 {{ ds }}는 today's "date stamp"

    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

DAG & Tasks Documentation

  • DAG는 Markdown만 지원

  • Tasks는 일반 텍스트, Markdown, reStructuredText, json 및 yaml을 지원

    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    # [END documentation]

Setting up Dependencies



t1.set_downstream(t2)
t2.set_upstream(t1)

t1 >> t2
t2 << t1

t1 >> t2 >> t3

t1.set_downstream([t2, t3])
t1 >> [t2, t3]
  • 이 때 순환구조를 가지면 에러 발생

  • Web-UI의 Graph 항목에서 확인할 수 있음

Airflow Testing

  • Script validation

    • tutorial 코드를 task_id만 tutorial_example로 바꿔 tutorial_example.py로 /usr/airflow/dags 경로에 생성하여 테스트 진행

    • 문법 오류 등의 test를 하는 과정은 단순히 python script를 실행시켜보면됨

    • docker 내부 python 버전과 다를 것이므로 docker compose 명령을 통해 실행

    • mount된 도커 내부 경로는 /opt/airflow/dags 임

docker compose run airflow-worker python /opt/airflow/dags/tutorial_example.py
  • CLI Metadata Validation
docker compose run airflow-worker airflow db init
docker compose run airflow-worker airflow dags list
docker compose run airflow-worker airflow tasks list tutorial_example
docker compose run airflow-worker airflow tasks list tutorial_example --tree
  • Testing

    • airflow tasks test 명령은 작업 인스턴스를 로컬로 실행하고 실행로그를 stdout으로 출력

    • 종속성과 무관하게 상태(실행, 성공, 실패 등)을 데이터베이스에 전달하지 않음

    • airflow dags test도 마찬가지지만 DAG level에서 작동

    • 특정 날짜의 task instance를 실행시키고 싶을 때 logical date를 지정해서 test할 수도 있음

# command subcommand [dag_id] [task_id] [(optional) date]

# testing print_date
docker compose run airflow-worker airflow tasks test tutorial_example print_date 2022-06-01

# testing sleep
docker compose run airflow-worker airflow tasks test tutorial_example sleep 2022-06-01

# testing templated
docker compose run airflow-worker airflow tasks test tutorial_example templated 2022-06-01

docker compose run airflow-worker airflow dags test tutorial_example 2022-06-01

0개의 댓글