Airflow dag execution_date의 dbt 적용

만년·2023년 2월 20일
0
post-thumbnail

기존 배치 작업들이 전부 current_date()로 dbt 모델을 실행하는 것을 발견했다.
이렇게 하면 airflow의 가장 편리한 기능인 backfill과 clear 기능을 사용해도 모든 모델이 현재 날짜를 기준으로 실행되버리기 때문에 이 부분을 개선 하려고 했고 일이 예상보다 길어졌다.

현재 환경은 아래와 같고

  • Airflow 3.2
  • DBT core 1.1.3

Airflow - DBT 연동은 아래 문서를 바탕으로 구현되어 있다.

dbt airflow를 지원하는 패키지가 있지만 아래 오류로 사용하지 못했다.
이 패키지를 정상적으로 사용하신 분이 있으면 제보 부탁드립니다.

  • 사용 패키지: dbt_airflow_macros ->
    EXECUTION_DATE="2020-01-01T01:23:45" dbt run 실행시 Not naive datetime (tzinfo is already set)오류가 발생, 사용 불가

결국 airflow-variable을 활용, dbt run 커맨드에 옵션으로 execution_date를 전달하는 방법으로 구현했다.
이 방법은 start task를 포함한 dag 전체를 재실행하거나, 다른 방법으로 airflow-variable에 설정된 dag_id_execution_date 값을 갱신할 때만 유효하다. 개별 task - clear 실행시 갱신되지 않은 airflow-variable 기준으로 task가 실행된다는 점을 유의해야 한다.


Scheduling and backfill을 위한 airflow-dbt execution_date 적용

  • 목적
    • execution date 도입을 통한 dag의 멱등성 확보
  • 설정

    1. airflow variable 등록

    2. dag 시작 지점인 start = PythonOperator에서 Airflow Variable에 실행일시 저장

        def start_dag(**op_kwarg):
            execution_date = op_kwarg["data_interval_start"]
            local_ed = Variable.set(f'{dag_id}_execution_date', execution_date.add(hours=9))
            logging.info(f'kst execution_date = {local_ed}') 
            
    3. dbt_project.yml에 var: execution_date: '' 설정

      vars:
        execution_date: ""	

      execution_date: "2020-01-01T01:23:45" 형식으로 초기값을 지정할수도 있지만, execution_date 미지정시 실행이 안되도록 비워둠

    4. dbt run 커맨드 실행시 --var 'execution date: execution_date '로 overriding

      • make_dbt_task 메소드의 command를 아래와 같이 설정
          command = (f"dbt {self.dbt_global_cli_flags} {dbt_verb} --vars 'execution_date: {execution_date}' "
          f"--target {self.dbt_target} --models {model_name} "
          f"--profiles-dir {self.dbt_profiles_dir} --project-dir {self.dbt_project_dir} "
          )
    5. sql 모델 내부에서 쿼리 헤드 작성

      {% call set_sql_header(config) %}
      DECLARE EXECUTION_DATE DATE DEFAULT DATE('{{var('execution_date')}}');
      {% endcall %}

시행착오 및 후기

- dbt macro를 이용해서 compile 단계에서 execution date를 받아오게 하는 방법
- dbt_project.yml 에 execution_date를 설정해주는 방법

위 두 방법 모두 dbt를 compile 하는 단계에서 execution_date가 고정되기 때문에 dag 개별 실행시간 정보를 다르게 설정할수 없었다.


참고자료

  1. DBT - Project variables
  2. Orchestrate dbt with Airflow
profile
Data Engineer

0개의 댓글