[Airflow] 에어플로우 시작하기 : 개념 및 설치

nooyji·2022년 6월 27일
0

Airflow는 복잡한 워크플로우를 프로그래밍 방식으로 작성해서, 스케줄링하고 모니터링할 수 있는 플랫폼이다.

데이터 파이프라인을 이루고 있는 ETL 스크립트들을 스케줄링할 때 crontab, cloudwatch 등을 사용하는 곳이 많다. 그러나 스크립트들이 많아지고 서로에 대한 의존성이 생기게 되면 컨트롤하기 어렵고, 기존 작업이 실패했을 때 다시 스크립트를 실행하려면 로그를 확인하고 실행해야 하는 등의 문제점이 생긴다. 이러면 바로 복구할 수도 없고, 어디서 잘 못 되었는지 확인하기도 어렵고, 의존성이 있는 스크립트가 잘 못되었는데 그 다음 스크립트가 실행이 되버리는 등의 문제점이 발생할 수 있다.

Airflow에는 서로에 대한 의존성을 표현할 수 있고, 스크립트가 실패했을 때 알람을 보내 확인하고 쉽게 수정 후 재시도할 수 있고, 이전 날짜 작업이 실패했을 때 그 날짜만 다시 실행하는 등 위의 문제점을 많이 해결해준다.

DAG(Directed Acyclic Graph)
DAG 는 유향 비순환 그래프라고 하며 에어플로우의 워크플로우는 python을 사용하여 작성할 수 있다.
하나의 DAG 안에는 한 개 이상의 Task가 있으며, Task는 실제 실행시키는 작업이다.
DAG 스크립트의 예시는 다음과 같다.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from utils.alert import on_failure, on_success

default_args = {
'owner': 'airflow',
'catchup': False,
'execution_timeout': timedelta(hours=6),
'depends_on_past': False,
'on_failure_callback': on_failure,
'on_success_callback': on_success,
}

dag = DAG(
'sample_dag',
default_args = default_args,
description = "sample description",
schedule_interval = "0 16 * * *",
start_date = days_ago(2),
tags = ['daily'],
max_active_runs = 3,
concurrency = 1
)

sample_task = BashOperator(
task_id = "sample_task",
bash_command = 'python3 sample_task.py',
dag = dag)

Operator

에어플로우는 Operator를 사용하여 python, bash, aws, slack 등 다양한 동작을 실행시킬 수 있다.
예를 들어서 아래처럼 BashOperator를 사용하면 bash 스크립트를 실행할 수 있다.

다른 예시로 SlackOperator를 사용하면 slack 에 메세지를 보낼 수 있다.

alert = SlackAPIPostOperator(
task_id = 'slack_failed',
channel = channel,
token = token,
text = text)
return alert.execute(context=context)

Executor
작업 실행을 시켜주는 실행기이다. SequentialExecutor, CeleryExecutor, LocalExecutor 등 실행 방법에 따라 종류를 선택할 수 있다.

Airflow 간단하게 설치하기

Airflow는 기본값으로 sqlite를 사용하며 이 경우 Executor는 SequentialExecutor를 사용하게 된다. 하지만 이것을 사용하게 되면 동시에 여러 작업을 실행시킬 수 없기 때문에 mysql을 간단하게 설치해서 사용하려고 한다.

MySQL 설치 및 설정하기

  1. my.cnf 파일을 생성하고 아래의 내용을 붙여넣는다.
# my.cnf
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
secure-file-priv = NULL

# Custom config should go here
!includedir /etc/mysql/conf.d/
explicit_defaults_for_timestamp = 1
default_authentication_plugin = mysql_native_password

caching_sha2_password 에러가 발생할 경우 default_authentication_plugin = mysql_native_password 옵션을 사용해주어야 한다.
explicit_defaults_for_timestamp 에러가 발생할 경우 explicit_defaults_for_timestamp = 1 옵션을 사용해주어야 한다.

  1. Dockerfile을 생성 후 빌드한다.
FROM mysql
COPY my.cnf /etc/mysql/my.cnf
docker build -t pipeline-db .
  1. 빌드한 도커를 실행한 후 도커에 접속한다.
docker stop pipeline-db && docker rm pipeline-db
docker run -d --name pipeline-db -e MYSQL_ROOT_PASSWORD=your_password -p 3306:3306 -v ~/mysql:/var/lib/mysql pipeline-db
docker logs pipeline-db
docker exec -it pipeline-db bash
mysql -u root -p
  1. 아래의 코드를 입력하여 airflow에 필요한 데이터베이스, 유저 등을 생성하고 설정한다.
CREATE DATABASE airflow CHARACTER SET UTF8mb3 COLLATE utf8_general_ci;
CREATE USER 'airflow'@'localhost' IDENTIFIED BY 'your_password';
GRANT ALL PRIVILEGES ON airflow.* TO 'airflow' @localhost;
CREATE USER 'airflow'@'%' IDENTIFIED BY 'your_password';
GRANT ALL PRIVILEGES ON airflow.* To 'airflow'@'%';
flush privileges;

데이터베이스를 생성할 때 아래의 명령어로 생성하면 된다.

CREATE DATABASE airflow CHARACTER SET UTF8mb3 COLLATE utf8_general_ci;

Airflow 설치하기

  1. pip로 airflow를 설치한다.
python3 -m pip install apache-airflow
  1. airflow 계정을 생성한다.
airflow users create --username admin --firstname airflow --lastname your_lastname --role Admin --email airflow@sample.com -p your_password
  1. $AIRFLOW_HOME 경로 (기본값 ~/airflow) 에서 airflow.cfg 파일을 열고 아래와 같이 수정한다.
# airflow 예제 제거하기
load_example = True

# DAG 업데이트 시간 조정하기 (UI), 이 시간을 0으로 둘 경우 아주 많은 CPU를 사용하게 된다.
min_file_process_interval = 60
dag_dir_list_interval = 30

# 데이터베이스 연결하기
sql_alchemy_conn = mysql://airflow:your_password@127.0.0.1:3306/airflow?charset=utf8

# Executor 설정하기
# SequentialExecutor(Default)를 사용하면 한 번에 하나의 작업만 처리할 수 있다.
executor = LocalExecutor
parallelism = 64
dag_concuttency = 32
  1. airflow webserver 실행하기
screen -dmS airflow-webserver airflow webserver --port 7171

웹서버에 문제가 생길 경우 종료해야 하는데 이 때 pid 는 $AIRFLOW_HOME/airflow-webserver.pid 경로에서 찾을 수 있다.

  1. airflow scheduler 실행하기 (screen을 사용한 방법)
    ssh 연결이 끊기더라도 계속 실행되게 하기 위해 screen을 사용하는 방법이다.
screen -dmS airflow-scheduler airflow scheduler

스크린 사용 방법은 다음과 같다.

# 로그 보는 방법
screen -r 세션 이름
ctrl + a, d 로 Detach

# 종료하는 방법
screen -r 세션 이름
Ctrl + c 로 종료

ps-df | grep airflow 명령어로 동작중인 프로세스를 확인할 수 있다.

  1. {your_server_ip}:7171 로 접속하면 Airflow 웹서버를 확인할 수 있으며 toggle 을 파란색 (Unfuse) 로 두면 스케줄 동작을 시작할 수 있다.

  2. 작업 중지하는 방법은 DAG의 Task UI에서 Make Failed 버튼을 누르면 되고, 작업을 재시작하는 방법은 Clear 버튼을 클릭하면 된다.

  3. 한 번에 실행되는 횟수가 16개인 이유는 Max Active Runs가 16이어서 그런데, DAG을 생성할 때나 airflow.cfg 파일을 수정해주면 된다.

Airflow 간단한 DAG 생성하기
$AIRFLOW_HOME/dags 경로에 py파일로 스크립트를 생성하면 airflow가 아까 설정했던 min_file_process_interval 시간에 맞춰 파일을 확인하고 업데이트 한다.

  1. 위의 경로에 sample.py 파일을 생성한다.
  2. 아래의 스크립트를 붙여넣는다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
'owner': 'airflow',
'catchup' : False,
'execution_timeout': timedelta(hours=6),
'depends_on_past': False,
}

dag = DAG(
'sample',
default_args = default_args,
description = "sample description",
schedule_interval = "@daily",
start_date = days_ago(3),
tags = ['daily'],
max_active_runs = 3,
concurrency = 1
)

sample_a = BashOperator(
task_id = 'sample_a',
bash_command = 'echo hello',
dag = dag)

sample_a = BashOperator(
task_id = 'sample_a',
bash_command = 'echo hello',
dag = dag)

sample_a << sample_b
  1. 저장하고 기다린 후 airflow web 을 확인해보면 새로운 DAG가 생성된 것을 볼 수 있다.

  2. toggle 로 unfuse 상태로 만들면 실행이 잘 되는 것을 확인할 수 있다.

원문 : https://data-engineer-tech.tistory.com/30

0개의 댓글