Airflow 코드의 기본 구조
- DAG를 대표하는 객체를 먼저 만듦
- DAG를 구성하는 태스크들을 만듦
- 태스크별로 적합한 오퍼레이터를 선택
- 테스크 ID를 부여하고 해야할 작업의 세부사항 지정
- 최종적으로 태스크들간의 실행 순서 결정
Airflow.cfg
- DAGs 폴더는 어디에 지정되는가?
: 기본적으로는 Airflow가 설치된 디렉토리 밑의 dags폴더가 되며 dags_folder키에 저장됨
- DAGs 폴더에 새로운 DAG를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되는가? 이 스캔 주기를 결정해준는 키의 이름이 무엇인가?
: dag_dir_list_interval(기본값은 300=5분)
- 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가?
: api 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경
- Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇인가?
: password, secret, passwd, authorization, api_key, apikey, access_token
- 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?
: sudo systemctl restart airflow-webserver
: sudo systemctl restart airflow-scheduler
- Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?
: fernet_key
Aiflow와 타임존
- airflow.cfg에는 두 종류의 타임존 관련 키가 존재
: default_timezone, default_ui_timezone
- start_date, end_date, schedule
: default_timezone에 지정된 타임존을 따름
- execution_date와 로그 시간
: 항상 UTC를 따름, 즉 execution_date를 사용할 떄는 타임 존을 고려해서 변환 후 사용필요
DAG 설정 예제
- task 공통 설정
모든 task의 공통 설정을 dictionary를 만들어 지정
from datetime import datetime,timedelta
default_args = {
'owner':'anso',
'email':['anso@mail.com'],
'retries':1,
'retry_delay':timedelta(minutes=3),
'on_failure_callback': func,
'on_success_callback': func,
}
- DAG 생성
from airflow import DAG
dag = DAG(
"dag_v1",
start_date = datetime(2020,8,7,hour=0,minute=00),
schedule="0 * * * *",
tags=["example"],
catchup=False,
default_args=default_args
Bash Operator 예제
- 3개의 태스크로 구성
- t1은 현재 시간 출력
- t2는 5초간 대기 후 종료
- t3는 서버의 /tmp 디렉토리의 내용 출력
- t1이 끝나고 t2와 t3를 병렬로 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner':'anso',
'email':['anso@mail.com'],
'retries':1,
'retry_delay':timedelta(minutes=3),
}
test_dat = DAG(
"dag_v1",
schedule="0 9 * * *",
tags=['test'],
catchUp=False,
default_args=default_args
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dat=test_dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=test_dag)
t3 = BahsOperator(
task_id='ls',
bash_command='ls/tmp',
dag=test_dag)
t1 >> [t2,t3]