이미 지난 날짜를 기준으로 재처리 하는 작업을 의미
로그 파일을 바탕으로 데이터베이스에 상태를 기록
depends_on_past=True를 사용하여, 이전 작업 인스턴스의 성공여부에 따라 종속성을 부여할 수 있음 (a → b → c 라면 이전 작업의 a가 성공해야 다음 작업의 a를 trigger)
wait_for_downsteam=True를 사용하면, 이전 작업의 downstream 작업 인스턴스까지 성공할 때까지 대기함
--start_date는 필수이며, --end_date를 옵션으로 넣을 수 있음
docker compose run airflow-worker airflow dags backfill tutorial_example \
--start-date 2015-06-01 \
--end-date 2023-03-07
Airflow DAG 또는 Task의 병렬성과 동시성에 관한 설정
airflow.cfg 설정
core.parallelism : 전체 airflow에서 running될 수 있는 task의 수를 설정
core.dag_concurrency : DAG 당 running될 수 있는 task의 수를 설정
core.non_pooled_task_slot_count : number of task slots allocated to tasks not running in a pool
core.max_active_runs_per_dag : maximum number of active DAG runs
scheduler.max_threads : 스케줄러의 max 스레드 수
celery.worker_concurrency : celery 사용시, worker가 사용할 수 있는 task instance의 최대 수
DAG 단위 설정
default로는 airflow.cfg를 사용하지만, 설정하면 설정한 것을 사용
concurrency
max_active_runs
aggregate_db_message_job = BashOperator(
task_id='aggregate_db_message_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_db_msg_agg',
bash_command=aggregate_db_message_job_cmd,
dag=dag)
BashOperator(
task_id="heavy_task",
bash_command="bash backup_data.sh",
pool_slots=2,
pool="maintenance",
)
BashOperator(
task_id="light_task1",
bash_command="bash check_files.sh",
pool_slots=1,
pool="maintenance",
)
Pool을 사용하여 임의 task 집합에 대한 실행 병렬을 제한
allocated slots을 할당하고, UI(Menu - Admin - Pool)에서 관리됨
Task를 생성할 때 pool 매개 변수를 사용하여 작업을 pool과 연결할 수 있음
slot이 가득 차면 후속 task는 대기열에 들어가고 slot이 확보되면 그때 실행되기 시작
Hadoop 클러스터에 접근할 때 여러 작업자가 무분별하게 자원을 사용하는 것을 방지할 수 있음
Pool을 지정하지 않으면 task는 default pool에 할당되고, 이는 128개의 slot으로 이루어져 있음