Airflow 소개 및 환경 설정

Juheon Oh·2023년 1월 13일
0
post-thumbnail

Airflow란?


  • 데이터 파이프라인을 관리할 때 사용하는 툴
  • workflow를 시각화하고 모니터링하기 쉬움
  • 오픈소스이며 파이썬으로 작성
  • 가장 커뮤니티가 크고 레퍼런스도 많음
  • 제공하는 기능도 많고 다른 프레임워크와도 연동이 쉬움

DAG

  • 컬러 블럭 하나가 Task
  • DAG (Directed Acyclic Graph)은 순서에 맞게 구성된 Task의 집합
  • Airflow에서는 Python으로 DAG을 쉽게 작성할 수 있음

Use Case

  • ETL
  • AI/ML (데이터 전처리 및 재학습)
  • DevOps (반복되는 일을 자동화)

Components

1. Scheduler

  • Task들을 스케줄링하고 Worker에게 Task를 넘겨줌

2. Executor

Local Executors

  • Sequential Executor: 가장 기본적인 Executor이지만 병렬적으로 Task를 처리 불가능
  • Local Executor: 병렬적으로 Task 처리 가능

Remote Executors

  • 기본적으로 여러 Worker node를 통해 Task를 분산적으로 처리함
  • Celery Executor, Kubernetes Executor 등이 있음

3. Worker

  • Scheduler와 통신하고 Task를 실행하는 node

4. Web Server

5. Metadata Database

  • Workflow, Task의 상태, Log 등을 기록
  • 내가 실행하는 Workflow에 DB를 활용하지 않아도 설치해야 함

Airflow Setup


지금까지 Airflow에 대해 간략한 설명을 했고 이제부터는 사용법에 대해 설명..

수작업

1. 환경변수 설정

export AIRFLOW_HOME= "airflow폴더가 있는 절대경로"

ex) export AIRFLOW_HOME=/Users/juheon/Desktop/jh/project/airflow

2. Airflow db 초기화

airflow db init

3. Airflow 유저 추가

airflow users create --username admin \
--firstname admin \
--lastname admin \
--role Admin \
--email example@admin.com

4. airflow.cfg의 sql_alchemy_conn에 상대경로 대신 절대 경로 명시

Airflow 2.2.5부터 airflow.cfg에서 sql_alchemy_conn에 상대경로 대신 절대 경로를 써줘야함

왜그런가? ➡️ 관련 이슈를 보면 상대 경로로 설정하면 DB 연결이 안되는 케이스 때문

따라서 기존 설정값 대신 절대경로 명시해줌
sql_alchemy_conn = sqlite:/// .airflow.db ➡️
sql_alchemy_conn = sqlite:////Users/juheon/Desktop/jh/final-project-level3-cv-06/airflow/airflow.db

5. Airflow webserver & scheduler 실행

airflow webserver --port 8080

airflow scheduler # 다른 터미널에서 실행

직접 만든 패키지나 파일을 import 할때

Airflow에서는 AIRFLOW_HOME을 기준으로 import하기 때문에 잘 설정해줘야함.

Docker-Compose


매번 Airflow를 수작업으로 실행하기는 번거롭고 실행 환경에 따라 에러가 날 수 있기 때문에
Docker-Compose를 통해 실행해보자.

Docker-Compose 장점

  • Command 하나로 Airflow setup에 필요한 모든 component를 관리 가능
  • 다른 환경에서 Airflow setup 시 배포하기 쉽고 사용하기 편리함

docker-compose.yaml 파일

version: '3'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-ozoooooh/airflow:2.5.1-arm}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    AIRFLOW__CORE__DEFAULT_TIMEZONE: Asia/Seoul
    AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: Asia/Seoul
    SQLALCHEMY_SILENCE_UBER_WARNING: 1

    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:14.6
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
      - "./init.sql:/docker-entrypoint-initdb.d/init.sql"
    ports:
      - 5432:5432
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.2.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - .:/sources

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

volumes:
  postgres-db-volume:
  • &airflow-common에서 & 기호는 변수를 지정
  • 같은 내용을 재사용할 때 사용
  • <<: *airflow-common로 불러올 수 있고 override를 통해서 수정 또는 추가도 가능

나같은 경우, (Airflow 기본 image + Task에 사용된 package) image 사용
Service는 postgre, webserver, scheduler, init, cli로 구성

  • 여기서 init은 metadata database를 initialize하는 역할
  • initialize을 하거나 이미 했었다면 자동으로 종료됨

cli는 Airflow에서 제공하는 CLI를 말함
이를 통해 Airflow Web 대신 터미널에서도 다양한 기능을 사용할 수 있음 (링크 참고)

사용법

docker compose up -d

해당 커맨드만 실행하면 Airflow setup 완료

정리

수작업으로 Airflow를 실행하는 것 보다는
Docker compose를 통해서 실행하면 더 쉽게 작업이 가능

0개의 댓글