[Airflow] 기본 개념

CHAN LIM·2024년 2월 19일
0

Airflow

목록 보기
2/4


Code 및 중요 한 줄

Airflow Python 스크립트가 실제로는
DAG의 구조를 코드로 지정하는 구성 파일일 뿐

DAG 정의 파일을 실제 데이터 처리를 수행할 수 있는 장소로 생각하지만
전혀 그렇지 않습니다.


import textwrap
from datetime import datetime, timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = textwrap.dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = textwrap.dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

DAG

Direct Acycled Graph

  • 관계와 의존성(Dependencies)을 가진 작업(Task)들의 집합

default_args (기본 매개변수)

DAG안에 들어갈 요소들을 Key-Value 형식으로 지정해 넣을 수 있다.

# 이 인수들은 각 연산자에 전달된다.
# 연산자 초기화 중에 Task별로 이를 재정의할 수 있다.
default_args={
    "depends_on_past": False,
    "email": ["airflow@example.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function, # or list of functions
    # 'on_success_callback': some_other_function, # or list of functions
    # 'on_retry_callback': another_function, # or list of functions
    # 'sla_miss_callback': yet_another_function, # or list of functions
    # 'trigger_rule': 'all_success'
},

Operator 및 Task

t1 = BashOperator(
    task_id="print_date",
    bash_command="date",
)

t2 = BashOperator(
    task_id="sleep",
    depends_on_past=False,
    bash_command="sleep 5",
    retries=3,
)

Operator

Operator는 Airflow가 완료할 작업 단위를 정의

즉, 특정 행위를 할 수 있는 기능을 모아 놓은 클래스 - 설계도

Operator를 사용하는 것은,
DAG 코드로 Task 의존성을 시각화하는 것을 돕는다.

PythonOperator, BashOperator, KubernetesPodOperator
가 매우 많이 사용된다.


Task

Operator에서 객체화되어 DAG에서 실행 가능한 오브젝트

DAG에서 Operator를 사용하기 위해, Task로 인스턴스화 필요

Task가 DAG에서 Operator가 어떻게 실행될 지 결정한다.


Dependencies 설정

t1.set_downstream(t2)

# 이것은 t2가 t1이 성공적으로 실행되어야 실행됨을 의미합니다.
# 다음과 동일합니다:
t2.set_upstream(t1)

# 비트 시프트 연산자는 작업을 연쇄시키는 데 사용할 수 있습니다:
t1 >> t2

# 그리고 비트 시프트 연산자로 상위 의존성을 설정합니다:
t2 << t1

# 여러 종속성을 연결하는 것은 비트 시프트 연산자로 간결해집니다:
t1 >> t2 >> t3

# 작업 목록을 종속성으로 설정할 수도 있습니다. 이러한 작업들은 동일한 효과를 갖습니다:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
profile
클라우드, 데이터, DevOps 엔지니어 지향 || 글보단 사진 지향

0개의 댓글