기존 배치 작업들이 전부 current_date()로 dbt 모델을 실행하는 것을 발견했다.
이렇게 하면 airflow의 가장 편리한 기능인 backfill과 clear 기능을 사용해도 모든 모델이 현재 날짜를 기준으로 실행되버리기 때문에 이 부분을 개선 하려고 했고 일이 예상보다 길어졌다.
현재 환경은 아래와 같고
Airflow - DBT 연동은 아래 문서를 바탕으로 구현되어 있다.
dbt airflow를 지원하는 패키지가 있지만 아래 오류로 사용하지 못했다.
이 패키지를 정상적으로 사용하신 분이 있으면 제보 부탁드립니다.
결국 airflow-variable을 활용, dbt run 커맨드에 옵션으로 execution_date를 전달하는 방법으로 구현했다.
이 방법은 start task를 포함한 dag 전체를 재실행하거나, 다른 방법으로 airflow-variable에 설정된 dag_id_execution_date 값을 갱신할 때만 유효하다. 개별 task - clear 실행시 갱신되지 않은 airflow-variable 기준으로 task가 실행된다는 점을 유의해야 한다.
설정
airflow variable 등록
dag 시작 지점인 start = PythonOperator에서 Airflow Variable에 실행일시 저장
def start_dag(**op_kwarg):
execution_date = op_kwarg["data_interval_start"]
local_ed = Variable.set(f'{dag_id}_execution_date', execution_date.add(hours=9))
logging.info(f'kst execution_date = {local_ed}')
dbt_project.yml에 var: execution_date: '' 설정
vars:
execution_date: ""
execution_date: "2020-01-01T01:23:45" 형식으로 초기값을 지정할수도 있지만, execution_date 미지정시 실행이 안되도록 비워둠
dbt run 커맨드 실행시 --var 'execution date: execution_date '로 overriding
command = (f"dbt {self.dbt_global_cli_flags} {dbt_verb} --vars 'execution_date: {execution_date}' "
f"--target {self.dbt_target} --models {model_name} "
f"--profiles-dir {self.dbt_profiles_dir} --project-dir {self.dbt_project_dir} "
)
sql 모델 내부에서 쿼리 헤드 작성
{% call set_sql_header(config) %}
DECLARE EXECUTION_DATE DATE DEFAULT DATE('{{var('execution_date')}}');
{% endcall %}
- dbt macro를 이용해서 compile 단계에서 execution date를 받아오게 하는 방법
- dbt_project.yml 에 execution_date를 설정해주는 방법
위 두 방법 모두 dbt를 compile 하는 단계에서 execution_date가 고정되기 때문에 dag 개별 실행시간 정보를 다르게 설정할수 없었다.