DAG는 크게 DAG인스턴스를 생성하는 부분, Operator(task)를 생성하는 부분, task간 의존성을 설정하는 부분으로 나뉜다.
DAG 클래스의 인스턴스를 생성해준다.
dag_id
인수에 Airflow 인터페이스에 나타나는 DAG 이름을 정의하고 (파일명으로 설정되는 줄 알고 삽질했다.ㅎ)
start_date
에 DAG를 처음 실행할 날짜를 설정한다.
schedule_interval
를 통해 일정 시간 간격으로 DAG를 실행할 수 있도록 설정할 수 있다. None
으로 설정한다면 자동으로 DAG가 실행되지 않는다.
Operator들은 이렇게 생성한 dag 인스턴스를 참조하여 Airflow에게 자신이 어떤 DAG에 속해있는지 알려준다.
dag = DAG(
dag_id="download_rocket_launches", # 인터페이스에서 나타나는 DAG 이름
description="Download rocket pictures of recently launched rockets.",
start_date=airflow.utils.dates.days_ago(14), # DAG 실행 날짜
schedule_interval="@daily", # DAG 실행간격
)
각 Operator는 하나의 Task를 수행하고 이 Operator들이 모여 DAG을 구성한다.
독립적으로 실행될 수도 있고, 의존성을 어떻게 설정하느냐에 따라 순서에 따라 실행될 수도 있다.
다음은 bash 스크립트를 실행하여 결괏값을 받아오기 위해 BashOperator를 사용한 예제이다.
# 1. BashOperator 사용 - bash 스크립트를 실행하여 결괏값을 받아온다
download_launches = BashOperator(
task_id="download_launches", # 태스크 명으로 DAG의 노드에 표시되는 이름이다.
bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",
dag=dag, # dag 참조함으로써 Airflow에게 어떤 DAG에 포함되어있는지를 알린다.
)
Airflow는 Task를 다양한 방법으로 수행할 수 있도록 다양한 Operator를 제공한다.
PythonOperator는 Python 함수를 실행할 수 있게 해준다. PythonOperator에서 python_callable
인수는 실행할 함수를 가리킨다. callable 함수로 설정할 함수를 먼저 생성하고 PythonOperator 생성 시 python_callable
의 인자로 해당 함수를 넘겨준다.
# 2. PythonOperator 사용 - Operator의 콜러블에 전달할 함수명을 태스크 이름과 동일하게 지어주고 앞에 _를 붙인다.
# 결괏값을 파싱하고 모든 로켓 사진을 다운로드한다.
def _get_pictures():
# 디렉토리가 있는 지 확인한다.
pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)
# launches.jso으로 이미지를 다운로드
with open("/tmp/launches.json") as f:
launches = json.load(f)
image_urls = [launch["image"] for launch in launches["results"]]
for image_url in image_urls:
try:
response = requests.get(image_url)
image_filename = image_url.split("/")[-1]
target_file = f"/tmp/images/{image_filename}"
with open(target_file, "wb") as f:
f.write(response.content)
print(f"Downloaded {image_url} to {target_file}")
except requests_exceptions.MissingSchema:
print(f"{image_url} appears to be an invalid URL.")
except requests_exceptions.ConnectionError:
print(f"Could not connect to {image_url}.")
get_pictures = PythonOperator(
task_id="get_pictures",
python_callable=_get_pictures, # 콜러블 함수를 설정한다.
dag=dag
)
Operator과 Task의 차이점
사용자 관점에서는 Operator와 Task를 같은 의미로 이해할 수 있고, 또 그렇게 사용되고 있지만
Airflow관점에서는 Task는 작업의 올바른 실행을 보장하기 위한 Operator의 Wrapper/Manager로 생각할 수 있다 (..? 이해가 가지 않음)
생성한 Task를 올바른 순서로 실행하기 위해서 >>
(오른쪽 시프트 연산자)로 Task간 의존성을 설정한다.
이를 통해 download_launches
가 완료된 후에만 get_pictures
가 실행이 되고, notify
도 마찬가지로 get_pictures
의 실행이 완료된 후에야 Task 실행을 시작할 수 있다.
download_launches >> get_pictures >> notify
앞서 DAG를 download_launches >> get_pictures >> notify
와 같이 실행되도록 정의하였다.
나는 Docker기반으로 Airflow를 실행시켜 DAG를 확인했다.