[Airflow] Trouble Shoot 및 기본 예제 실행

geun·2023년 2월 25일
0

Airflow

목록 보기
2/2
post-thumbnail

리소스 관련 Trouble Shoot

이전 글에서 Airflow 컨테이너들의 리소스가 너무 많이 차지하는 이슈가 있었습니다.
따라서 제가 예상한 부분을 먼저 해결해보고자 Apache에서 기본적으로 제공하는 예제들을 초기화시 생성하지 않도록 설정하고 Airflow를 실행시켜 보았습니다.

기본적으로 제공하는 예제들을 생성하지 않는 코드는 아래와 같습니다.

# docker-compose.yml 파일의 아래 내용을 수정합니다.

$ vim docker-compose.yml

>>
version: '3'
x-airflow-common:
  &airflow-common
  ## 생략 ##
  environment:
    &airflow-common-env
	  
		## 생략 ##
    
		AIRFLOW__CORE__LOAD_EXAMPLES: 'false'  ## true -> false로 수정

결과는 위와 같습니다.

평소 리소스 사용량은 약 40% 정도를 차지하고 Airflow workflow triggerer 가 지정된 schedule 작업들을 트리거 하면서 리소스가 늘어났을 때 70% 정도 차지하는 것을 볼 수 있습니다.

아직 완벽하게 원인을 찾아서 이슈를 해결한 것은 아니지만, Airflow에 대해 처음 학습하는 것이기 때문에 예제들을 다뤄보며 추후에 조금씩 최적화 작업을 해보도록 하겠습니다!


Airflow basic example

저번 글에서 Airflow Workflow의 주요 컴포넌트들과 동작과정 등을 간략하게 살펴보았습니다.

이번 글에서는 직접 example을 다뤄보면서 실제로 Airflow가 어떤 식으로 동작하는지 직접 사용해보고 경험해보겠습니다!

Airflow의 Workflow는 DAG 들로 구성되는데 이 DAG 는 Python 코드로 작성됩니다. 따라서 예제를 따라서 Python 코드로 DAG를 작성해보겠습니다

저는 Docker Compose 를 이용해서 Airflow 환경을 구성했기 때문에 Airflow 컨테이너의 DAG를 모아두는 폴더가 Docker의 볼륨 기능을 이용해서 호스트의 dags폴더와 연결되어 있습니다. 따라서 해당 폴더에 dag_basic_example.py 파일을 생성하고 그 안에 파이썬 코드를 작성해보도록 하겠습니다.

작성한 코드는 아래와 같습니다.

from datetime import datetime, timedelta
from textwrap import dedent

# DAG 객체로써 DAG를 인스턴스화 하는데 필요합니다.
from airflow import DAG

# BashOperator는 Airflow에서 가장 기본적인 Operator입니다.
# 이 Operator는 하나의 Bash 명령어를 실행합니다.
# 각 args는 Operator의 생성자에 전달됩니다.
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'owner-name',
    'depends_on_past': False,
    'email': ['your-email@g.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=15),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
with DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2022, 2, 1),
    catchup=False,
    tags=['example-sj'],
) as dag:
    
    # t1, t2, t3는 각각의 테스크이며 BashOperator를 사용합니다.
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )
    
    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )
    
    t1.doc_md = dedent(
    """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    """
    )

    dag.doc_md = __doc__  # DAG가 시작할 때 DocString을 보여주고 싶다면
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # 그렇지 않다면 아래와 같이 작성합니다.
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
    )
    
    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
    )
    
    t1 >> [t2, t3]

DAG 작성 시 체크 해야할 포인트는 아래와 같습니다.

Airflow Operator

Airflow는 Operator를 이용해서 Task를 정의합니다.
(위 코드에서는 BashOperator를 이용해서 Task를 정의했습니다.)

Operator의 종류는 Python, Bash, KubernetesPodOperator 등 여러 Operator가 존재하고 각 Operator 마다 수행하는 목적과 주체 등이 다릅니다.

예제에서 사용한 BashOperator는 Airflow에서 가장 기본적인 Operator입니다. 모든 Operator는 Airflow에서 작업을 실행하는 데 필요한 모든 인수를 포함하는 BaseOperator에서 상속됩니다.

Default Args

DAG를 구성할 때 필요한 default argument를 미리 구성할 수 있습니다.

Task

DAG 에서 Operator를 사용하기 위해서는 작업으로 인스턴스화 해야합니다.
예제 코드에서는 t1, t2, t3가 BashOperator를 인스터스화 한 작업입니다. 인스턴스화 된 각 Task의 첫 번째 Parameter인 task_id를 정의함으로써 각 Task별 고유 식별자를 가질 수 있습니다.

생성된 DAG 확인해보기

dag 폴더에 예제 DAG를 저장했다면 Airflow Scheduler에 의해서 docker compose에 정의한 주기마다 해당 폴더에서 DAG를 찾아서 가져오는 것 같습니다.

Airflow 환경을 구성하면서 같이 띄워둔 Airflow Web Server 에 접속해서 확인해보면 작성한 DAG가 추가되어 있는 것을 확인할 수 있습니다.

작성한 DAG를 클릭해보면 해당 DAG의 다양한 정보를 시각적으로 확인해 볼 수 있습니다. 간단하게 확인해본 정보를 나열해보자면 DAG의 Grid 구성, Workflow Graph, Calendar, 실행 시 소요시간 기록 등 다양한 Workflow 관련 기록을 시각화된 상태로 볼 수 있습니다.

직접 DAG를 Trigger 해서 실행해보겠습니다.

정상적으로 DAG가 실행되었는지 확인하는 방법은 여러가지가 있겠지만, 간단하게 Log를 확인해보겠습니다.

관련 Log는 Airflow Web에서도 확인이 가능하고 logs 디렉토리에서도 확인이 가능합니다.

Airflow Web을 확인해보고 있었으니 Web을 통해 Log를 확인해보겠습니다.

실행한 DAG창에서 Details → Success → 확인하고 싶은 Task를 클릭합니다.

실행에 성공한 Task들을 확인 할 수 있는데 더 자세하게 확인해보기 위해 제일 상위에 있는 Task의 Id를 클릭해보겠습니다.

해당 Task가 언제 실행되었는지, Task의 Documentation은 어떤 내용인지 등 다양한 정보를 확인해볼 수 있습니다.


이번 글에서는 간단하게 Trouble Shoot을 진행하고 DAG 기본 예제를 실행해봤습니다!
다음 글에서 Airflow를 조금 더 자세하게 다뤄보겠습니다!


참고한 자료


profile
말하는 감자🥔에서 기록도 하는 감자🥔로 거듭나기

0개의 댓글