Airflow Introduction

Dongmin Lee·2023년 5월 20일
0

Airflow

목록 보기
1/2
post-thumbnail

1. Apache Airflow

  • Airbnb에서 만든 워크플로우(workflow) 관리 툴
  • Python framework를 이용하여 배치 지향 워크플로우(batch-oriented workflow)를 작성하고 스케줄링 및 모니터링하는 플랫폼
  • 워크플로우를 Task의 DAG(directed acyclic graph)로 작성한다.
  • 다양한 Task들을 병렬적으로 실행 가능(parellelly executable)
  • Airflow webserver에서 제공해주는 웹 인터페이스를 통해 워크플로우 관리 및 모니터링이 용이하다.
  • Airflow 인스턴스에 쉽게 플러그인을 설치할 수 있어서 customizing하기가 쉽다.
  • 데이터 웨어하우스, 머신러닝, 데이터 분석, A/B테스트, 데이터 인프라 관리 등 여러 데이터 엔지니어링 환경에 유용하게 쓰인다.

2. Key Concepts

2.1 DAG(Directed Acyclic Graph)

  • 방향성이 있는 순환하지 않는 그래프
  • A -> B -> C (O) / A -> B -> C -> A (X)

2.2 Operator

  • Task을 정의하는 데에 사용된다.
  • Task의 Warpper 역할

2.3 Task

  • Operator를 실행시키면 Task가 된다.
  • Task = Operator Instance

2.4 Workflow

  • 작업 흐름이라는 뜻
  • 의존성으로 연결된 Task들의 집합
  • 가장 작은 단위인 Operator들이 모여서 Task들이 되고, Task들이 모여 하나의 DAG가 되고, DAG들이 모여 하나의 Workflow가 된다.

3. Architecture

Airflow에서 제공하는 Executor 중 Celery Executor를 사용한다고 가정하자.

3.1 Airflow Webserver

  • Airflow UI
  • 워크플로우의 상태를 표시하고, 실행, 재시작, 수동 조작, 로그 확인 등을 할 수 있다.
  • Airflow의 metastore에 저장된 로그를 보여주거나 DAG들을 시각화해서 제공한다.

3.2 Airflow Scheduler

  • 주기적으로 DAG과 Task들을 모니터링하고 실행할 시기와 순서를 결정한다.
  • 지속적으로 dags 폴더에 있는 DAG 파일들을 구문 분석하여 메모리에 각 파일에 대한 DAG를 생성한다.
  • 각 DAG에 대해 스케줄링 파라미터와 Task 간의 종속성을 확인한다.
  • Task의 상태가 "scheduled"인 경우, 스케줄러는 Executor에게 Task를 queue에 넣도록 지시한다.
  • 스케줄러에 의해 이루어지는 모든 작업들은 metastore에 저장된다.

3.3 Celery Executor

  • 스케줄러에서 실행하도록 표시된 Task를 수행하고 실제 Task의 실행을 관리한다.
  • Celery의 동작 방식에 따라 queue를 이용하여 각 Worker에게 Task를 분산한다.
  • Queue에 Task에 대한 메시지를 추가한다. (e.g., DAG ID, Task ID, 실행 날짜 등 포함, 실제 코드는 미포함)
  • Executor에는 Squential Executor, Local Executor, Celery Executor, Kubernetes Executor 등이 있다. (Executor 종류에 대한 더 자세한 내용은 링크 참고)

3.4 Airflow Workers

  • 실제 Task를 수행한다.
  • Worker들은 queue에서 받은 메시지에 따라 DAG 폴더에서 해당 DAG을 찾고 DAG에 있는 해당 Task를 처리한다.
  • Worker들은 Task가 완료된 후 metastore에 Task의 상태를 업데이트한다.

3.5 Airflow Flower

  • Celery UI
  • Worker 상태, queue 상태 등을 확인할 때 사용한다.

3.6 Message Queue(Broker)

  • 스케줄러에서 Worker로 메시지를 전달하는 message queue
  • Celery에서는 Redis나 RabbitMQ만 사용 가능하다.

3.7 Metastore(Database)

  • 메타데이터가 저장되는 데이터베이스
  • Tasks, DAGs, 각종 변수들, connection 정보 등 상태에 대한 메타데이터 저장한다.

4. How DAG Works

  1. 유저가 DAG를 작성하여 DAG 폴더에 배치
  2. 스케줄러가 DAG를 파싱하여 metastore에 저장
  3. 스케줄러가 metastore를 통해 DagRun 오브젝트를 생성 (DagRun은 유저가 작성한 DAG의 인스턴스)
  4. DagRun status: Running (문제가 없으면..?)
  5. DagRun 오브젝트가 각 Task 인스턴스를 만듦
  6. 스케줄러가 각 Task 인스턴스를 metastore에 생성하고, 생성된 Task 인스턴스들을 Executor로 전달
  7. Executor의 방식에 따라 Worker들이 Task를 실행
  8. Task 완료 후 Worker들이 metastore에 상태 및 결과를 저장
  9. 스케줄러는 각 Task의 실행이 완료 되었는지 확인
  10. DagRun status: Completed
  11. 웹 서버가 metastore 정보를 읽어서 UI를 업데이트함

0개의 댓글