Airflow

mnsukoo·2022년 6월 8일
0

data

목록 보기
1/1

AirBnB에서 만든 workflow management tool
Python 코드로 워크플로우(workflow)를 작성하고, 스케쥴링, 모니터링 할 수 있는 플랫폼

일련의 작업 흐름
Not ETL → workflow 및 파이프라인을 관리하는 툴

관리란?
workflow 작성 → 스케쥴링 → 모니터링

Airflow Component

Worker

실제 task를 실행하는 주체자. Executor 종류에 따라 동작 방식이 다양

Executor

실행중인 task를 handling하는 component

default 설치시에는 scheduler에 잇는 모든 것을 실행하지만 production수준에서의 executorsms worker에게 task를 push.

Scheduler

airflow로 할당된 work들을 스케쥴링 하는 conponent

schuduled된 workflow들의 triggering과 실행하기 위해서 executor에게 task를 제공해주는 역할을 수행

Database

Airflow에 존재하는 DAG와 Task들의 메타데이터를 저장하는 데이터베이스

Dag

Directed Acyclic Graph로 개발자가 Python으로 작성한 워크플로우. Task들의 dependency를 정의.

DAG(Directed Acyclic Graph)

실행하고 싶은 Task들의 관계와 dependency를 표현하고 있는 Task들의 모음.
어떤 순서와 어떤 dependency로 실행할지, 어떤 스케줄로 실행할지 등의 정보를 포함
따라서 DAG를 정확하게 설정해야 Task를 원하는 대로 스케쥴링할 수 있음.

  • Dag는 Task로 구성됨
  • 각 Task는 Operator 클래스를 인스턴스화 하여 만든다
# ex)
my_task = BashOperator(...)
  • 각 Task에서 오류가 발생할 경우 여러번 재실행 할 수 있다.
    • Airflow를 완전히 멈췄다가 미완료 Task를 재 시작하면서 실행 중이던 업무 흐름으로 되돌아갈 수 있음
  • Dag가 실행되면 Airflow는 데이터베이스에 Dag 실행 항목 생성
  • 단방향 비순환 그래프로 설계
    • 작업 흐름을 짤 때 그것이 어떻게 독립적으로 실행 가능한 태스크들로 나뉠 수 있을까 생각해봐야 함

Operator

Airflow는 기본적인 Task를 위해 다양한 Operator를 제공

  • BashOperator : bash command 실행
  • PythonOperator : Python 함수 실행
  • EmailOperator : Email 발송
  • MySqlOperator : sql 쿼리 수행
  • Sensor : 시간, 파일, db row, 등을 기다리는 센서, 시간, 파일, 외부 이벤트를 기다리며 해당 조건을 충족해야만 이후의 작업을 진행

Airflow를 사용하는 경우

  • 데이터 흐름을 자동으로 구성, 실행 및 모니터링해야 하는 경우
  • 파이썬 코드를 이용해 파이프라인 구현이 가능하기때문에 파이썬 언어에서 구현할 수 있는 대부분의 데이터 핸들링 가능
  • 반복적인 배치 Task 실행 기능에 적합
  • 추가 및 삭제 Task가 빈번한 동적 파이프라인에는 부적합

Airflow 사용

Airflow는 DAG를 관리하는 디렉토리를 지정해 관리함
→ airflow.cfg 에서 설정

  • 해당 경로에 있는 dag 정보를 이용해서 dags를 표시하고 관리함
  • dag는 python코드를 기반으로 작성되기 때문에 .py 파일로 만들어야함

Default Database 변경 : SQLite to PostgreSQL

기본 SQLite를 제공하지만 PostgreSql로 변경하는 이유
→ SequentialExecutor 가 아닌 다른 Executor를 사용하기 위해
SequentialExcutor는 병렬처리를 제공하지 않기 때문에 DAG에서 같은시간으로 스케쥴러를 실행해 놓아도 순차적 실행됨

airflow.cfg 파일 수정

# 수정 전
executor = SequentialExecutor
...
sql_alchemy_conn = sqlite:////home/minsukoo/airflow/airflow.db

# 수정 후
executor = LocalExecutor
...
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow1!@localhost/airflow코드를 입력하세요

0개의 댓글