Airflow Backfill, Concurrency

Alan·2023년 3월 23일
0

Airflow 맛보기

목록 보기
6/7

Backfill

  • 이미 지난 날짜를 기준으로 재처리 하는 작업을 의미

  • 로그 파일을 바탕으로 데이터베이스에 상태를 기록

  • depends_on_past=True를 사용하여, 이전 작업 인스턴스의 성공여부에 따라 종속성을 부여할 수 있음 (a → b → c 라면 이전 작업의 a가 성공해야 다음 작업의 a를 trigger)

  • wait_for_downsteam=True를 사용하면, 이전 작업의 downstream 작업 인스턴스까지 성공할 때까지 대기함

  • --start_date는 필수이며, --end_date를 옵션으로 넣을 수 있음

docker compose run airflow-worker airflow dags backfill tutorial_example \
    --start-date 2015-06-01 \
    --end-date 2023-03-07
  • backfill 명령을 수행하지 않더라도 DAG 설정의 catchup=True를 설정하면, 가용한 concurrency를 모두 활용해서 backfill 작업을 수행할 수 있음

Parallelism, Concurrency

  • Airflow DAG 또는 Task의 병렬성과 동시성에 관한 설정

  • airflow.cfg 설정

    • core.parallelism : 전체 airflow에서 running될 수 있는 task의 수를 설정

    • core.dag_concurrency : DAG 당 running될 수 있는 task의 수를 설정

    • core.non_pooled_task_slot_count : number of task slots allocated to tasks not running in a pool

    • core.max_active_runs_per_dag : maximum number of active DAG runs

    • scheduler.max_threads : 스케줄러의 max 스레드 수

    • celery.worker_concurrency : celery 사용시, worker가 사용할 수 있는 task instance의 최대 수

  • DAG 단위 설정

    • default로는 airflow.cfg를 사용하지만, 설정하면 설정한 것을 사용

    • concurrency

      • 모든 active run 상태인 dags들 하위에서 실행가능한 최대 task instance의 수 "core.dag_concurrency"
    • max_active_runs

      • 해당 DAG의 최대 active run 상태가 가능한 DAG의 수. "core.max_active_runs_per_dag"
  • Operator 단위 설정
aggregate_db_message_job = BashOperator(
    task_id='aggregate_db_message_job',
    execution_timeout=timedelta(hours=3),
    pool='ep_data_pipeline_db_msg_agg',
    bash_command=aggregate_db_message_job_cmd,
    dag=dag)
BashOperator(
    task_id="heavy_task",
    bash_command="bash backup_data.sh",
    pool_slots=2,
    pool="maintenance",
)
BashOperator(
    task_id="light_task1",
    bash_command="bash check_files.sh",
    pool_slots=1,
    pool="maintenance",
)
  • Pool을 사용하여 임의 task 집합에 대한 실행 병렬을 제한

    • allocated slots을 할당하고, UI(Menu - Admin - Pool)에서 관리됨

    • Task를 생성할 때 pool 매개 변수를 사용하여 작업을 pool과 연결할 수 있음

    • slot이 가득 차면 후속 task는 대기열에 들어가고 slot이 확보되면 그때 실행되기 시작

    • Hadoop 클러스터에 접근할 때 여러 작업자가 무분별하게 자원을 사용하는 것을 방지할 수 있음

    • Pool을 지정하지 않으면 task는 default pool에 할당되고, 이는 128개의 slot으로 이루어져 있음

0개의 댓글