[Airflow] DAG 구조

Woong·2024년 11월 12일
0

Apache Airflow

목록 보기
1/9

DAG 객체 인스턴스 생성

import json
import pathlib

import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

"""
DAG 객체 인스턴스 생성.
모든 workflow의 시작점.
workflow 내의 모든 task는 DAG 개체를 참조 -> 어떤 task가 어떤 DAG 에 속하는지 확인할 수 있다.
"""
dag = DAG(
    dag_id="download_rocket_launches", 
    description="Download rocket pictures of recently launched rockets.",
    start_date=airflow.utils.dates.days_ago(14), # DAG 처음 실행 시작 날짜
    schedule_interval="@daily", # DAG 실행 간격. None 일 경우 자동 실행 안됨
)

Operator 인스턴스

  • 각 operator 는 하나의 task 수행
  • 여러 operator 가 workflow 또는 DAG를 구성

"""
bash command 실행하는 BashOperator
"""
download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",  # noqa: E501
    dag=dag, # 모든 오퍼레이터는 dag 변수를 참조하여 인스턴스가 어떤 DAG 에 속하는지 airflow에게 알려줌
)
download_launches >> get_pictures >> notify
  • >> 로 태스크 실행 순서 정의

task vs operator

  • operator : 단일 작업 수행 역할
  • task: operator 의 wrapper 또는 manager 역할

PythonOperator

  • python 함수를 실행하는 operator
def _get_pictures():
    # Ensure directory exists
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

    # Download all pictures in launches.json
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")

"""
PythonOperator 는 python 코드 실행 담당
1. operator 자신(get_pictures) 을 정의해야 함
2. python_callable : 호출 가능한 일반 함수
편의를 위해 변수명과 task_id 동일하게 작성
"""
get_pictures = PythonOperator(
    task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

Airflow 실행

  • 의존성 설치

    • pip install apache-airflow
  • metastore 초기화

    • metastore: airflow 상태를 저장하는 db
airflow db init
  • user 생성
airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email admin@example.org
  • scheduler, 웹서버 시작
// 작성한 DAG 를 DAG 디렉터리에 복사한 후 스케줄러와 웹서버 시작
cp donwload_rocket_launches.py ~/airflow/dags/
airflow webserver
airflow scheduler

도커 컨테이너에서 Airflow 실행

#!/bin/bash

SCRIPT_DIR=$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd)

docker run \
-ti \
-p 8080:8080 \
-v ${SCRIPT_DIR}/../dags/download_rocket_launches.py:/opt/airflow/dags/download_rocket_launches.py \
--name airflow
--entrypoint=/bin/bash \
apache/airflow:2.0.0-python3.8 \
-c '( \
airflow db init && \
airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email admin@example.org \
); \
airflow webserver & \
airflow scheduler \
'

reference

  • 서적 'Apache Airflow 기반의 데이터 파이프라인'
  • github 예제

0개의 댓글

Powered by GraphCDN, the GraphQL CDN