import json
import pathlib
import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
"""
DAG 객체 인스턴스 생성.
모든 workflow의 시작점.
workflow 내의 모든 task는 DAG 개체를 참조 -> 어떤 task가 어떤 DAG 에 속하는지 확인할 수 있다.
"""
dag = DAG(
dag_id="download_rocket_launches",
description="Download rocket pictures of recently launched rockets.",
start_date=airflow.utils.dates.days_ago(14), # DAG 처음 실행 시작 날짜
schedule_interval="@daily", # DAG 실행 간격. None 일 경우 자동 실행 안됨
)
"""
bash command 실행하는 BashOperator
"""
download_launches = BashOperator(
task_id="download_launches",
bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'", # noqa: E501
dag=dag, # 모든 오퍼레이터는 dag 변수를 참조하여 인스턴스가 어떤 DAG 에 속하는지 airflow에게 알려줌
)
download_launches >> get_pictures >> notify
>>
로 태스크 실행 순서 정의def _get_pictures():
# Ensure directory exists
pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)
# Download all pictures in launches.json
with open("/tmp/launches.json") as f:
launches = json.load(f)
image_urls = [launch["image"] for launch in launches["results"]]
for image_url in image_urls:
try:
response = requests.get(image_url)
image_filename = image_url.split("/")[-1]
target_file = f"/tmp/images/{image_filename}"
with open(target_file, "wb") as f:
f.write(response.content)
print(f"Downloaded {image_url} to {target_file}")
except requests_exceptions.MissingSchema:
print(f"{image_url} appears to be an invalid URL.")
except requests_exceptions.ConnectionError:
print(f"Could not connect to {image_url}.")
"""
PythonOperator 는 python 코드 실행 담당
1. operator 자신(get_pictures) 을 정의해야 함
2. python_callable : 호출 가능한 일반 함수
편의를 위해 변수명과 task_id 동일하게 작성
"""
get_pictures = PythonOperator(
task_id="get_pictures", python_callable=_get_pictures, dag=dag
)
의존성 설치
pip install apache-airflow
metastore 초기화
airflow db init
airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email admin@example.org
// 작성한 DAG 를 DAG 디렉터리에 복사한 후 스케줄러와 웹서버 시작
cp donwload_rocket_launches.py ~/airflow/dags/
airflow webserver
airflow scheduler
#!/bin/bash
SCRIPT_DIR=$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd)
docker run \
-ti \
-p 8080:8080 \
-v ${SCRIPT_DIR}/../dags/download_rocket_launches.py:/opt/airflow/dags/download_rocket_launches.py \
--name airflow
--entrypoint=/bin/bash \
apache/airflow:2.0.0-python3.8 \
-c '( \
airflow db init && \
airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email admin@example.org \
); \
airflow webserver & \
airflow scheduler \
'