MWAA (Managed Workflows for Apache Airflow) 사용Cloud Composer 사용Azure Data Factory에 Airflow DAGs 기능 존재substitue user의 약자로 현재 사용 중인 사용자 계정을 로그아웃하지 않고 다른 사용자의 권한을 얻을 때 사용하며 sudo와 함께 사용된다. sudo su postgres 혹은 sudo su aws 명령어가 사용된다.1. EC2 인스턴스 생성
EC2 인스턴스를 Ubuntu Server 20.04를 선택해 준다.t3.small이나 t3a.small을 선택해 준다.Key pair를 생성해 주어야 한다. 만약 기존 사용 중인 Key pair가 존재한다면 해당 Key pair를 사용해도 된다.Security group은 Port 번호 24와 8080가 오픈되어야 한다. 2. SSH 로그인
EC2의 DNS 서버 주소를 복사해서 다음과 같은 명령어에 붙인다.ssh -i airflow-dev.pem DNS_SERVER
chmod, 윈도우는 file property를 수정해 준다. ppk의 경우 putty key generator를 사용해 준다. 이건 같은 상황을 겪었을 때 Trouble Shooting에 대해 포스팅을 한 적이 있다. 🔑 chmod 명령어 Window에서 사용 시 해결 방법sudo apt-get update
3. python 3.0 설치
wget https://bootstrap.pypa.io/get-pip.py
sudo python3 get-pip.py
sudo apt-get install -y python3-pip
sudo pip3 install pyopenssl --upgrade
4. airflow 설치
sudo apt-get install -y libmysqlclient-dev
sudo pip3 install --ignore-installed "apache-airflow[celery,amazon,mysql,postgres]==2.5.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.7.txt"
sudo pip3 install oauth2client gspread numpy pandas
5. airflow 계정 생성
sudo groupadd airflow
sudo useradd -s /bin/bash airflow -g airflow -d /var/lib/airflow -m
6. Postgres 설치
Postgres를 사용하기 위해 Postgres를 설치해 준다.sudo apt-get install -y postgresql postgresql-contrib
Postgres 사용자로 재로그인해 준 후에 Postgres shell을 실행한 다음 airflow라는 이름의 계정을 생성한다. $ sudo su postgres
$ psql
psql (10.12 (Ubuntu 10.12-0ubuntu0.18.04.1))
Type "help" for help.
postgres=# CREATE USER airflow PASSWORD 'airflow';
CREATE ROLE
postgres=# CREATE DATABASE airflow;
CREATE DATABASE
postgres=# \q
$ exit -- 이를 사용하면 postgres shell에서 나갈 수 있다
Postgres 재시작해 준다.sudo service postgresql restart
7. Airflow 기본 환경 세팅
sqlite기 때문에 다음과 같이 Postgres로 변경해 주어야 한다.SequentialExecutor인데 Sequential Executor지만 제약이 많기 때문에 주로 LocalExecutor로 수정한다.sudo su airflow
$ cd ~/
$ mkdir dags
$ AIRFLOW_HOME=/var/lib/airflow airflow db init
$ ls /var/lib/airflow
airflow.cfg airflow.db dags logs unittests.cfg
Airflow 환경 파일에 접근해 주어 앞서 말했던 executor과 DB 연결 스트링인 sql_alchemy_conn을 수정해 보자.[core]
...
executor = LocalExecutor
...
[database]
...
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
...
AIRFLOW_HOME=/var/lib/airflow airflow db init

Docker Desktop을 설치해 주어야 한다. 현재 링크는 windows로 되어 있지만 OS 환경에 맞추어 설치가 가능하다.cd airflow-setup이라는 명령어를 통해 폴더를 이동해 준다.yaml 파일을 다운로드한다. docker-compose.yaml 파일은 Docker의 세팅 환경이다.curl -LfO "https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml" 해당 명령어로 yaml 파일을 다운받는다.docker-compose -f docker-compose.yaml pull 명령어를 통해 이미지를 다운로드 받을 수 있다.docker-compose -f docker-compose.yaml up 명령어를 통해 컨테이너를 실행해 준다.
default_arg를 먼저 설정해 주어야 한다. 이때 dictionary 구조로 구현한다.from datetime import datetime, timedelta
default_args = {
'owner': 'ssong',
'email': ['ssong@naver.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
나올 수 있는 설정 값이 많은데 on_failure_callback과 on_success_callback을 통해 실패, 성공 여부에 따라 행할 이벤트(이메일로 알림을 보내는 등)를 설정해 줄 수 있다.
메인 DAG를 설정해 준다.
from airflow import DAG
dag = DAG(
"dag_v1", # DAG name
start_date=datetime(2020,8,7,hour=0,minute=00),
schedule="0 * * * *",
tags=["example"],
catchup=False,
# common settings
default_args=default_args
)

catchup이란? catchup이 Truecatchup이 True이면 start_date와 오늘 사이에 실행이 안 된 날짜에 대해 DAG를 실행Full Refresh DAG라면 False로 설정
print_date를 누르면 print_date이 어떻게 실행되었는지 확인할 수 있다. (여기서 print_date는 태스크이다.)Clear를 선택해 주면 DAG를 재실행하게 된다.
Log를 선택하여 확인할 수 있다.docker ps라는 명령어를 통해 docker의 CONTAINER ID를 확인해야 한다.
scheduler가 들어간 CONTAINER ID를 확인한 후 docker exec -it CONTAINER ID sh를 실행해 준다.(airflow)가 뜨면 뒤에 필요한 코드를 입력해 주면 된다. 만약 DAG_V1의 태스크들을 알고 싶다면 airflow tasks list dag_v1을 입력해 준다.
airflow tasks test dag_v1 print_date 2023-06-08을 해 준다.
1. Executor 개념, 왜 Sequential Executor은 제약이 많을까?
- 개인적으로
Executor의 개념이 잘 이해가 가지 않아 추가적으로 공부해 보았다.Executor란airflow가 일을 처리하는 방식 즉, 작업 방식을 결정 짓는 것이다.- 예를 들어, 이런 DAG가 있다고 가정해 본다면 task_0에서 task_3까지는 의존 관계가 없기 때문에 병렬적으로 수행되는 게 더 효율적일 것이다. 물론 start >> task_0 >> task_1 >> task_2 >> task_3 >> end로 간다면 덜 리소스를 사용한다는 장점은 있겠지만 시간적인 측면에서는 더 오랜 시간이 걸리게 될 것이다.
- 이와 같이 어떻게 작업을 수행할지 방법을 지정해 주는 것이
Executor이다.- 그렇다면 왜 default인
Sequential Executor은 제약이 많을까?Sequential Executor는 병렬 프로세스를 적용할 수 없다. 병렬적으로 수행이 가능한 항목들도 순차적으로 진행이 되기 때문에 속도가 느리다.
2. Docker Engine Stopped
- 아직 실습도 안 했고 그냥 airflow만 실행하는데 갑자기 Docker Engine Stopped 상태가 돼서 많이 당황했다.
- 분명 실행됐었는데 다시 처음부터 진행해 보려 하니 다음과 같은 오류가 발생하였다.
error during connect: in the default daemon configuration on Windows, the docker client must be run with elevated privileges to connect- 다른 블로그 글들을 찾았는데 WSL 업데이트를 하며 해결했다고 하길래 WSL를 다시 다운로드 하려고 했더니 역시나 어제 최신 버전을 받아서 그냥 가장 최근 버전이라는 안내만 떴다.
- 해당 에러 검색해 봤는데 도커 커넥션 문제니까 도커를 켜고 재실행하면 된다고 했다.
- 그냥 로딩만 계속 돌아가고 RESTART 버튼이 활성화가 안 되는데요.
- 해당 문제에 대해 대부분 대책으로 Docker Desktop부터 새로 받길래 최종 보류로 두고 혹시나 하는 마음에 PC 재부팅 하니까 다시 RESTART가 됐다.
- 또 같은 현상이 일어나면 원인을 찾아 봐야 할 것 같다.
- 아무튼 C 드라이브 다 정리해 주고 20 GB 확보한 뒤에 드디어 만난 airflow localhost 화면.
- 이거는 트러블 슈팅이라고 할 수도 없어서 따로 빼 두었다.