2023/12/13

anso·2023년 12월 12일
0

TIL

목록 보기
19/20
post-thumbnail

Airflow 코드의 기본 구조

  1. DAG를 대표하는 객체를 먼저 만듦
    • DAG 이름, 실행주기, 실행날짜, 오너 등
  2. DAG를 구성하는 태스크들을 만듦
    • 태스크별로 적합한 오퍼레이터를 선택
    • 테스크 ID를 부여하고 해야할 작업의 세부사항 지정
  3. 최종적으로 태스크들간의 실행 순서 결정

Airflow.cfg

  1. DAGs 폴더는 어디에 지정되는가?
    : 기본적으로는 Airflow가 설치된 디렉토리 밑의 dags폴더가 되며 dags_folder키에 저장됨
  2. DAGs 폴더에 새로운 DAG를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되는가? 이 스캔 주기를 결정해준는 키의 이름이 무엇인가?
    : dag_dir_list_interval(기본값은 300=5분)
  3. 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가?
    : api 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경
  4. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇인가?
    : password, secret, passwd, authorization, api_key, apikey, access_token
  5. 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?
    : sudo systemctl restart airflow-webserver
    : sudo systemctl restart airflow-scheduler
  6. Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?
    : fernet_key

Aiflow와 타임존

  • airflow.cfg에는 두 종류의 타임존 관련 키가 존재
    : default_timezone, default_ui_timezone
  • start_date, end_date, schedule
    : default_timezone에 지정된 타임존을 따름
  • execution_date와 로그 시간
    : 항상 UTC를 따름, 즉 execution_date를 사용할 떄는 타임 존을 고려해서 변환 후 사용필요

DAG 설정 예제

  1. task 공통 설정
    모든 task의 공통 설정을 dictionary를 만들어 지정
from datetime import datetime,timedelta

default_args = {
	'owner':'anso',
    'email':['anso@mail.com'],
    'retries':1,  # 실패시 최대 재시도 횟수
    'retry_delay':timedelta(minutes=3), # 재시도할때 대기시간
    'on_failure_callback': func, # 실패 시 다음 함수 실행
    'on_success_callback': func, # 성공 시 다음 함수 실행
}
  1. DAG 생성
from airflow import DAG

dag = DAG(
	"dag_v1",  # DAG 이름
    start_date = datetime(2020,8,7,hour=0,minute=00),
    schedule="0 * * * *",  # 분, 시간, 날짜, 월, 요일 순서
    tags=["example"],
    catchup=False,  # start_date로 생성하는 날보다 과거로 설정 시 그 차이를 따라잡을지(True), 따라잡지 않을지(False) 결정
    default_args=default_args

Bash Operator 예제

  • 3개의 태스크로 구성
  • t1은 현재 시간 출력
  • t2는 5초간 대기 후 종료
  • t3는 서버의 /tmp 디렉토리의 내용 출력
  • t1이 끝나고 t2와 t3를 병렬로 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
	'owner':'anso',
    'email':['anso@mail.com'],
    'retries':1,
    'retry_delay':timedelta(minutes=3),
}
test_dat = DAG(
	"dag_v1",
    schedule="0 9 * * *",
    tags=['test'],
    catchUp=False,
    default_args=default_args
)

# 날짜 출력 태스크
t1 = BashOperator(
	task_id='print_date',
    bash_command='date',
    dat=test_dag)
    
# 5초동안 가만히 있고 종료
t2 = BashOperator(
	task_id='sleep',
    bash_command='sleep 5',
    dag=test_dag)
    
# tmp디렉토리 파일 조회 태스크
t3 = BahsOperator(
	task_id='ls',
    bash_command='ls/tmp',
    dag=test_dag)
    
 t1 >> [t2,t3]  # t1 실행 후 t2, t3 동시 실행

0개의 댓글