airflow를 설치하였으니 이제 스케쥴러를 사용해 보려고 한다
외주를 받아서 짜둔 코드가 있는데 그걸 매주 업데이트를 해줘야 하는 상황이다
매번 손으로 하기 번거로우니 스케쥴러 등록을 해서 사용해보자
Directed Acyclic Graph 이며 파이썬을 사용하여 작성하면 된다
하나의 DAG에는 하나 이상의 task가 있고 각 task가 실제 실행 되는 작업이다
DAG 파일은 /airflow/dags 안에 정의하면 된다
(airflow.cnf파일에 dags_folder 패스가 만들어져있다)
Operator는 task를 어떻게 실행 시킬지를 나타내는데 보통은 bash나 python오퍼레이터를 사용한다
나는 python 오퍼레이터를 사용해서 DAG를 작성했다
from airflow.operators.python import PythonOperator
from airflow import DAG
# DAG 생성
dag= DAG(
dag_id='test_dag', # 웹서버에 표시될 dag이름
description= 'testing dag file', # dag설명
start_date= datetime(2022, 6,21, tzinfo=kst), # 시작일시 시작일시는 과거로 작성해도된다
schedule_interval=None, # cron 표현식으로 scheduling
catchup= False # 이전에 실행되지 않았던 DAG 를 실행할지 말지 결정
)
여기서 shecule_interval은 cron표현식 처럼 사용하면 되는데 테스트는 일회성으로 보기위해 None으로 설정했다
(@once라고 해줘도 한번만 실행된다)
DAG를 작성후에 task를 정의해주면 된다
def hello_world():
print('hellp world')
def hello_world2():
print('hellp world2')
task1 = PythonOperator(
task_id = 'test_dag',
python_callable = hello_world2,
dag = dag
)
task2 = PythonOperator(
task_id = 'test_dag2',
python_callable = hello_world,
dag = dag
)
# dependencies
task1 >> task2
일단 잘 올라가는지 보기위해 테스트 함수를 작성하고 task두개를 만들었다
dependencies는 task의 순서라고 보면 될거같다
위와 같은 경우에는 task1이 실행 된 후에 task2가 실행된다
이 예제에서 task는 아무런 관계가 없지만 예를들어 이미지를 다운로드 한 다음에 무언가를 해야한다 ...? 이런경우에 task를 나누고 dependencies 를 정의하면 될거같다
파이썬 스크립트가 모두 작성되었다면 에어플로우 스케쥴러를 실행해보자
1) 웹서버를 먼저 띄우고
2) 탭을 하나더 열어 스케줄러를 실행한다
웹서버에 들어가서 보면 만들어둔 test_dag를 확인 할 수 있다
이 화면에서 task가 두개 정의 된거를 확인할수 있고 garph를 확인해보면
각 task의 dependency도 확인할수 있다
이제 테스트로 작성해둔 함수대신 내가 실제로 사용할 함수를 넣어주면 될거같다
끝!