Airflow Variable, Connection, XCOM

Alan·2023년 3월 23일
0

Airflow 맛보기

목록 보기
7/7

개별 Task에서 사용할 수 있는 외부 데이터에 대해 알아보자

Variable

  • admin의 variable에서 추가, 삭제

  • Task에서 참조할 수 있는 클러스터 변수

  • variable 이용한 사용

from airflow.models import Variable

# Normal call style
foo = Variable.get("foo")

# Auto-deserializes a JSON value
bar = Variable.get("bar", deserialize_json=True)

# Returns the value of default_var (None) if the variable is not set
baz = Variable.get("baz", default_var=None)
  • jinja template
# Raw value
echo {{ var.value.<variable_name> }}

# Auto-deserialize JSON value
echo {{ var.json.<variable_name> }}

Connections

  • 외부 Database 등의 연결

  • Web-UI에서 등록

echo {{ conn.<conn_id>.host }}
  • 위 코드처럼 호출할 수 있음

XCOM

  • 물리적으로 서로 다른 환경에서 동작하는 task들 사이에서 데이터를 주고받을 때 사용

  • task_id 및 dag_id로 식별, MetaDB에 저장됨

  • 대량의 데이터를 전달하는 데 사용하면 안됨!

  • PUSH

@task
# ti = task instance
def push(ti=None):
    """Pushes an XCom without a specific target"""
    ti.xcom_push(key="value from pusher 1", value="hello")

@task
def foo(ti=None):
    ti.xcom_push(key="table_name", value="name"
  • PULL
# Pulls the return_value XCOM from "pushing_task"
pulled_value_1 = ti.xcom_pull(task_ids="push", key="value from pusher 1")

# jinja
SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}

0개의 댓글