Data Engineering Course For Begginers (Youtube)
ETL
- basically means to download the data or bring it on board
- Where? API vs FTP
- Format? Csv, Json, Xml
- Compressed vs Uncompressed
- validation stage
- what if something goes wrong? -> Garbage in, garbage out
- Primary key check
LOAD
- Relational(MySQL,Postgresql...) vs Nonrelational(MongoDB...)
- Location(Onpremise vs Cloud)
- ORM(Object Relational Mappers) : SqlAlchemy...
Apache Airflow Tutorial for Beginners
Airflow
Core Concepts
DAG(Directed Acyclic Graph) : 비순환 그래프, Workflow를 나타냄
Task : 작업단위
Operator : Task가 하는 작업의 유형을 나타냄(PythonOperator, BashOperator, ...)
Execution Date : logic date/time which the dag run
Task Instance : Task + Excution Date
Dag Run : DAG + Execution Date
Task Lifecycle
no status
(->scheduler) : 아직 scheduler를 거치지 않은 단계
scheduler
(->[scheduled,removed,upstream failed, skipped])
- scheduled(-> executor) : scheduler determined task instance needs to run
- upstream failed : the task's upstream task failed
- skipped : task is skipped
- removed : task have been removed
executor
(->[queued])
- ququed(->worker) : scheduler sent task to executor to run on the queue
worker
(->[running])
- running(->[success,failed,shut down, up for reschedule]) : worker picked up a task and is now running it
- success : task completed without error
- up for reschedule : reschedule task every certain time interval
- failed(->up for retry) : task failed
- shut down(->up for retry) : task run has been aborted
- up for retry : task will be scheduled and rerun the task after certain waiting time
Basic Architecture
- Webserver : 다양한 Web UI 제공, python Flask로 구현됨
- scheduler : job을 worker에게 전달, worker node가 여러개면 queue에 job을 보냄
- Woker : job을 받아 DAG를 수행
- Metadata DB : 실행관련된 여러가지 정보 저장(ex. log), default는 SQLlite, 실제 현업에서는 MySQL, Postgres와 같은 것을 이용
Airflow 설치
출처 : Running Airflow in Docker
# docker-compose.yaml 다운로드
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.2/docker-compose.yaml'
CeleryExecutor 를 LocalExecutor로 변경하고 CeleryExecutor 관련된 라인을 전부 삭제했다. LocalExecutor이기 때문에 redis, flower 삭제
# volume 을 저장할 디렉토리 생성
mkdir -p ./dags ./logs ./plugins
# initialization
docker-compose up airflow-init
# background 로 컨테이너 실행
docker-compose up -d
Airflow tutorial
Operator
BashOperator
: bash 명령어 실행하는 operator
PythonOperator
: Python code를 실행하는 operator
위의 것 외에도 수많은 operator 존재
Xcoms
- Xcom을 통해 데이터 공유 가능(최대 48KB)
- ti.xcom_pull(task_ids,key)
- task_id에 해당한 task가 push한 데이터중 key값을 가지는 데이터 Return
- ti.xcom_push(key,value)
- ti : task instance
Task Flow API
- dag, task 데코레이터를 활용하여 code를 더 간결하게 만들 수 있음
from airflow.decorators import dag, task
...
@dag (
...
)
def dag1():
@task()
def task1():
...
return value1
@task()
def task2():
...
return value2
@task()
def task3(value1,value2):
...
return value1 + value2
value1 = task1()
value2 = task2()
print(taks3(value1,value2))
dag1()
- value1과 value2가 xcom에 push되고 task3가 그 값을 pull함
- [task1,task2] >> task3 와 같은 task dependency가 자동으로 만들어짐
catchup, backfill
- catchup은 True/False 값을 가질 수 있으며, default 값은 True
- catchup=False : start_date이 현재보다 이전일때 가장 최근 것만 실행
- catchup=True : start_date이 현재보다 이전일때 start_date부터 현재까지 실행됬어야할 것들을 모두 실행
- backfill은 과거에 어떤 이유로 실행되지 않은 execution을 재실행해줌
- start_date, end_date을 파라미터로 범위를 지정가능
airflow dags backfill -s [start_date] -e [end_date] [dag_id]
명령어로 실행가능
- docker로 airflow로 실행한 경우에는 scheduler container의 shell에 접속해서 위 명령어를 입력해주면 됨
cron expression
- dag의 schedule_interval을 '@daily'와 같은 형태로도 가능하지만 cron expression으로도 가능
- cron expression은 5개의 부분으로 나눠지고 각 부분은 공백으로 구분됨
- 순서대로 Minute, Hour, Day, month, week 를 나타냄
-*
은 every 를 뜻함
- 예를 들어
0 * * * *
는 매 시 정각을 뜻함
- 각 부분에 여러개의 값을 넣을 수도 있음 이때는
,
로 구분함
- 예를들어
0,30 * * * *
은 매 시 정각, 30분을 뜻함
type backfill vs scheduled
Connection
- airflow는 여러개의 service와 연결되어 있음
- connection은 이러한 service와 연결을 도와줌
- connection은 주로 Operator가 많이 사용함(너무 당연한 소리인가?)
- connection을 사용하게 되면 credential(Host,User,PW)를 코드에서 숨길 수 있음
- 환경변수로 선언할 수도 있고 Web UI를 통해서도 할 수 있음
- 환경변수로 선언할때는 보통
AIRFLOW_CONN_[conn_id]: [conn_type]://id:password@host:port
가 일반적임(conn_type에 따라 바뀔 수 있음)
- docker-compose up -d --no-deps --build airflow-webserver airflow-scheduler 를 수행해 container를 recreate 시켜서 변경사항 적용가능
Install Python package
- Extending
- dockerfile 작성해서 requirements.txt 설치
- 99%의 경우 사용, 빠름, 쉬움
- Customizing
- apache/airflow git clone해서 docker-context-files 에 requirements.txt 생성후
- docker build . --build-arg AIRFLOW_VERSION=.. --tag ... 해서 이미지 생성
- extending으로 안되는 경우, 이미지 크기 최적화가 가능
Sensor
- A special type of operator which waits for something to occur
- tool for use cases in which you don't know exactly when the data will be available