Airflow 코드 구성은 기본적으로 DAG와 Operator로 구성된다.
dag_id
, start_date
, schedule_interval
등을 설정import airflow.utils.dates
from airflow import DAG
dag=DAG(
dag_id = "test_dag", # Airflow UI에 출력되는 DAG 이름
start_date= airflow.utils.dates.days_ago(14), # 시작 시점에서 14일 전 부터 task를 실행 14번 task를 실행하게됨
schedule_interval=None,
)
from airflow.operators.python import PythonOperator
def test_function(): # PythonOperator에서 호출되는 python 함수
print("hello airflow python operator")
test_python_operator = PythonOperator(
task_id = "test_python_operator",
python_callable = test_function, # test_function 함수 사용
dag = dag
)
from airflow.operators.bash import BashOperator
test_bash_operator = BashOperator(
task_id="test_bash_operator",
bash_command='echo "hello BashOperator"' # bash command 실행
dag = dag
)
>>
연산사를 통해서 operator를 순차적으로 실행# python operator 실행 후 bash operator를 실행
test_python_operator >> test_bash_operator
first_dag
의 print_hello_world
, echo_hello_world
의 task log에 hello world
출력 import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
dag=DAG(
dag_id = "first_dag",
start_date= airflow.utils.dates.days_ago(14),
schedule_interval="@daily",
)
def _print_hello_world():
print("hello_world")
print_hello_world = PythonOperator(
task_id = "print_hello_world",
python_callable = _print_hello_world,
dag = dag
)
echo_hello_world = BashOperator(
task_id = "echo_hello_world",
bash_command = 'echo "echo hello world"',
dag = dag
)
print_hello_world >> echo_hello_world
python print와 bash echo가 log에 잘 출력되었다!