Airflow DAG 작성하기

ohyujeong·2023년 5월 29일
0

Airflow

목록 보기
2/6

DAG 작성

DAG는 크게 DAG인스턴스를 생성하는 부분, Operator(task)를 생성하는 부분, task간 의존성을 설정하는 부분으로 나뉜다.

DAG 인스턴스 생성

DAG 클래스의 인스턴스를 생성해준다.
dag_id 인수에 Airflow 인터페이스에 나타나는 DAG 이름을 정의하고 (파일명으로 설정되는 줄 알고 삽질했다.ㅎ)
start_date 에 DAG를 처음 실행할 날짜를 설정한다.
schedule_interval 를 통해 일정 시간 간격으로 DAG를 실행할 수 있도록 설정할 수 있다. None 으로 설정한다면 자동으로 DAG가 실행되지 않는다.
Operator들은 이렇게 생성한 dag 인스턴스를 참조하여 Airflow에게 자신이 어떤 DAG에 속해있는지 알려준다.

dag = DAG(
    dag_id="download_rocket_launches", # 인터페이스에서 나타나는 DAG 이름
    description="Download rocket pictures of recently launched rockets.",
    start_date=airflow.utils.dates.days_ago(14), # DAG 실행 날짜
    schedule_interval="@daily", # DAG 실행간격
)

Operator 생성

각 Operator는 하나의 Task를 수행하고 이 Operator들이 모여 DAG을 구성한다.
독립적으로 실행될 수도 있고, 의존성을 어떻게 설정하느냐에 따라 순서에 따라 실행될 수도 있다.
다음은 bash 스크립트를 실행하여 결괏값을 받아오기 위해 BashOperator를 사용한 예제이다.

# 1. BashOperator 사용 - bash 스크립트를 실행하여 결괏값을 받아온다
download_launches = BashOperator(
    task_id="download_launches", # 태스크 명으로 DAG의 노드에 표시되는 이름이다.
    bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",  
    dag=dag, # dag 참조함으로써 Airflow에게 어떤 DAG에 포함되어있는지를 알린다.
)

Airflow는 Task를 다양한 방법으로 수행할 수 있도록 다양한 Operator를 제공한다.
PythonOperator는 Python 함수를 실행할 수 있게 해준다. PythonOperator에서 python_callable 인수는 실행할 함수를 가리킨다. callable 함수로 설정할 함수를 먼저 생성하고 PythonOperator 생성 시 python_callable 의 인자로 해당 함수를 넘겨준다.

# 2. PythonOperator 사용 - Operator의 콜러블에 전달할 함수명을 태스크 이름과 동일하게 지어주고 앞에 _를 붙인다.
#                         결괏값을 파싱하고 모든 로켓 사진을 다운로드한다.
def _get_pictures():
    # 디렉토리가 있는 지 확인한다.
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

    # launches.jso으로 이미지를 다운로드
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")

get_pictures = PythonOperator(
    task_id="get_pictures", 
    python_callable=_get_pictures, # 콜러블 함수를 설정한다.
    dag=dag
)

Operator과 Task의 차이점

사용자 관점에서는 Operator와 Task를 같은 의미로 이해할 수 있고, 또 그렇게 사용되고 있지만
Airflow관점에서는 Task는 작업의 올바른 실행을 보장하기 위한 Operator의 Wrapper/Manager로 생각할 수 있다 (..? 이해가 가지 않음)

Task간 의존성 설정

생성한 Task를 올바른 순서로 실행하기 위해서 >> (오른쪽 시프트 연산자)로 Task간 의존성을 설정한다.
이를 통해 download_launches 가 완료된 후에만 get_pictures 가 실행이 되고, notify 도 마찬가지로 get_pictures의 실행이 완료된 후에야 Task 실행을 시작할 수 있다.

download_launches >> get_pictures >> notify

작성한 DAG를 Airflow 인터페이스에서 확인하기

앞서 DAG를 download_launches >> get_pictures >> notify 와 같이 실행되도록 정의하였다.
나는 Docker기반으로 Airflow를 실행시켜 DAG를 확인했다.

profile
거친 돌이 다듬어져 조각이 되듯

0개의 댓글