개별 Task에서 사용할 수 있는 외부 데이터에 대해 알아보자
admin의 variable에서 추가, 삭제
Task에서 참조할 수 있는 클러스터 변수
variable 이용한 사용
from airflow.models import Variable
# Normal call style
foo = Variable.get("foo")
# Auto-deserializes a JSON value
bar = Variable.get("bar", deserialize_json=True)
# Returns the value of default_var (None) if the variable is not set
baz = Variable.get("baz", default_var=None)
# Raw value
echo {{ var.value.<variable_name> }}
# Auto-deserialize JSON value
echo {{ var.json.<variable_name> }}
외부 Database 등의 연결
Web-UI에서 등록
echo {{ conn.<conn_id>.host }}
물리적으로 서로 다른 환경에서 동작하는 task들 사이에서 데이터를 주고받을 때 사용
task_id 및 dag_id로 식별, MetaDB에 저장됨
대량의 데이터를 전달하는 데 사용하면 안됨!
PUSH
@task
# ti = task instance
def push(ti=None):
"""Pushes an XCom without a specific target"""
ti.xcom_push(key="value from pusher 1", value="hello")
@task
def foo(ti=None):
ti.xcom_push(key="table_name", value="name"
# Pulls the return_value XCOM from "pushing_task"
pulled_value_1 = ti.xcom_pull(task_ids="push", key="value from pusher 1")
# jinja
SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}