[TIL 53일자] 데브코스 데이터엔지니어링

·2023년 6월 26일
0

데브코스

목록 보기
49/55
post-thumbnail

📚 오늘 공부한 내용

1. DAG 실행하는 방법

  • schedule로 지정해 주기적으로 실행할 수 있다.
  • 다른 DAG에 의해 트리거로 실행되도록 처리해 준다. (이 방법이 더 좋음)
    • Explicit Trigger
      • DAG A가 분명하게 다음 DAG인 DAG B를 알고 있는 트리거
      • TriggerDagOperator
      • Jinja TEMPLATE을 사용해야 한다.
      • A->B
    • Reactive Trigger
      • DAG B가 DAG A가 끝나기를 대기하고 있다 DAG A의 태스크가 끝나면 실행
      • ExternalTaskSensor
      • DAG A가 이 사실을 알 수 없다.
      • 의존 관계가 DAG A에게 보이지 않기 때문에 발생할 수 있는 이슈가 있다.
      • B->A
  • BranchPythonOperator는 상황에 맞춰 뒤에 어떤 태스크가 실행될지를 동적으로 설정한다. 조건에 따라 다른 태스크로 분기한다.
  • LatestOnlyOperator는 실행되는 시점을 따지고 과거 데이터를 backfill 하기 위해 실행되는 거라 판단되면 실행이 중단되는 Operator이다. 불필요한 태스크를 처리한다.
  • Trigger Rule 보통은 앞단 태스크가 실행이 되어야 뒤의 태스크가 실행되게 되는데 어떤 경우는 앞단 태스크가 실패하더라도 뒤 태스크는 꼭 실행되어야 하는 경우와 앞단 중에 하나만 성공해도 뒤 태스크가 실행되어야 하는 경우에 대한 처리가 있다.


2. TriggerDagOperator

  • DAG A의 태스크를 TriggerDagRunOperator로 구현한다.
  • trigger_dag_id 뒤에 트리거 하려는 즉, 다음 DAG의 이름을 기재한다.
  • conf는 DAG B에 넘기고 싶은 정보를 주는 것으로 받은 쪽 DAG에서는 Jinja Template을 사용한다면 dag_run.conf["path"])로도 접근 가능하다. 만약 PythonOperator로 사용한다면 kwargs['dag_run'].conf.get('path')로 작성해 준다.
  • execution_date는
  • reset_dag_run을 True로 해 두면 해당 실행 일자에 DAG를 실행한 정보가 있더라도 실행하라는 뜻이다. 이미 있는 경우 값은 오버라이딩 된다.
  • wait_for_completion은 트리거 대상의 DAG가 끝날 때까지 기다리라는 뜻이다. DAG가 끝나면 해당 Operator가 끝나게 된다.
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_B = TriggerDagRunOperator(
 task_id="trigger_B",
 trigger_dag_id="트리거하려는DAG이름",
 conf = {'path':'/opt/ml/conf'},
 execution_date = "{{ds}}",
 reset_dag_run = True,
 wait_for_completion = True
)

3. Jinja Template

1) Jinja Template이란?

  • Jinja Template은 Python에서 널리 사용되는 템플릿 엔진이다.
  • flask에서 많이 쓰인다.
  • 하나의 언어 안에서 로직 및 변수를 가지고 다양한 케이스를 처리할 수 있다.
  • 변수는 {{}} 이중 중괄호를 감싸서 사용한다.
<h1>안녕하세요, {{ name }}님!</h1>
  • 제어문은 퍼센트 기호로 표시한다. {% %}
<ul>
{% for item in items %}
 <li>{{ item }}</li>
{% endfor %}
</ul>

2) Airflow에서 Jinja Template

  • Airflow에서 Jinja Template을 사용하면 작업 이름, 파라미터 또는 SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의 가능하다.
  • 코드의 재사용이 늘어난다.
  • execution_date와 같은 태스크 실행할 때 주어지는 파라미터를 Jinja Template에서 쉽게 사용할 수 있다.
  • 또한 variable, connection 등에서 사용할 수 있다.
  • 모든 경우에 쓸 수 있는 게 아니라 BashOperator에서만 사용할 수 있다.
# 시스템 정보
task1 = BashOperator(
 task_id='task1',
 bash_command='echo "{{ ds }}"',
 dag=dag
)

# 동적 매개변수
task2 = BashOperator(
 task_id='task2',
 bash_command='echo "안녕하세요, {{ params.name }}!"',
 params={'name': 'John'}, # 사용자 정의 가능한 매개변수
 dag=dag
)

3) Airflow에서 사용 가능한 Jinja 변수들

  • {{ ds }}
  • {{ ds_nodash }}
  • {{ ts }}
  • {{ dag }}
  • {{ task }}
  • {{ dag_run }}
  • {{ var.value }}: {{ var.value.get('my.var', 'fallback') }}
  • {{ var.json }}: {{ var.json.my_dict_var.key1 }}
  • {{ conn }}: {{ conn.my_conn_id.login }}, {{ conn.my_conn_id.password }}

4. Sensor

1) Sensor란?

  • Sensor는 특정 조건이 충족될 때까지 대기하는 Operator이다.
  • 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용하다.
  • Airflow에서 제공하는 내장 Sensor
    • FileSensor
    • HttpSensor
    • SqlSensor
    • TimeSensor
    • ExternalTaskSensor
  • poke 모드는 체크 주기를 명확하게 보장할 수 있다. worker 하나를 잡고 주기적으로 check 하는 게 poke 모드이고, default 모드이다. reschedule 모드도 존재한다.
2) External Task Sensor
  • DAG B(뒤에 실행되어야 할 태스크)가 DAG A의 특정 태스크가 끝났는지를 체크한다.
  • 동일한 schedule_interval을 사용해야 한다.
  • 웬만해서는 사용하지 않는 것이 좋다.
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
 task_id='waiting_for_end_of_dag_a',
 external_dag_id='DAG이름',
 external_task_id='end',
 timeout=5*60,
 mode='reschedule'
)

5. Trigger Rule

  • 윗단 태스크의 성공 실패 상황에 따라 뒷단 태스크의 실행 여부를 결정하고 싶을 때 사용한다.
  • Operator에 trigger_rule이란 파라미터로 결정한다.
  • 설정 값
    • ALL_SUCCESS (default)
    • ALL_FAILED
    • ALL_DONE
    • ONE_FAILED
    • ONE_SUCCESS
    • NONE_FAILED
    • NONE_FAILED_MIN_ONE_SUCCESS

6. 태스크 그룹핑

  • 태스크의 수가 많은 DAG라면 태스크들을 성격에 따라 관리할 필요성이 있다.
  • 이전에는 SubDAG가 사용되다 Task Grouping으로 넘어가는 추세이다.
  • TaskGroup 안에 TaskGroup도 가능하다. (nesting 가능)
  • 태스크처럼 실행 순서 역시 정의 가능하다.
  • 한 군데 묶여 보이기 때문에 관리가 쉽다는 장점이 있다.

7. Dynamic Dag

  • 템플릿 형태로 DAG를 찍어내야 하는 경우가 생기는데 이를 매번 개발자가 해 주는 것이 아니라 이걸 코드를 통해서 해 준다면 더 효율적이고 생산성이 높다.
  • DAG 코드를 개발자가 아닌 코드로 제공한다.
  • DAG를 계속해서 만드는 것과 한 DAG 내에서 태스크를 늘리는 것 사이의 밸런스가 필요하다. (개인적인 경험은 비슷해도 오너가 다른 경우 Dag level에서 분리하는 것이 좋다. 태스크 수가 너무 많아지는 경우도 Dag를 분리하는 게 좋다.)
  • Jinja 템플릿을 하나 만들어 두고, generator.py라는 변환기를 만들어 준다. config.yml 파일을 통해 세팅을 해 주고 generator.py를 돌려 주면 DAG가 생성되게 된다.
    • generator.py를 자동화해 줄지 매번 사람이 돌려 줄지에 대해서는 고민해 봐야 할 문제이다.


🔎 어려웠던 내용 & 새로 알게 된 내용

📌 과제

  1. curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config의 경우 Access가 거부 당하게 되는데 이를 컨트롤 해 주는 KEY가 무엇인지와 그 키를 docker-compose.yaml에 어떻게 적용해야 하는지
    • docker-compose.yaml 파일에서는 airflow-common 밑에 있는 environments 밑의 AIRFLOW__WEBSERVER__EXPOSE_CONFIG를 'true'로 설정해 주어야 한다.
    • docker compose 환경을 down 한 후 다시 up을 통해 실행해 준다.
    • 그 후 config API를 호출해서 결과가 나오는지 확인해 본다.
    • 이번에는 access denied가 뜨지 않고 config 환경이 잘 나오는 것을 볼 수 있다.
  2. connections API와 variables API는 환경 변수도 리턴하는지 아닌지
    • Web UI를 통해서 세팅한 variables와 connections만 출력된다.

1. task 관련 명령어

airflow tasks list DAG명: DAG의 task list를 가지고 온다.
airflow tasks test DAG명 task명 execution_date: 해당 DAG의 task를 실행한다.

2. docker container 내부에서 postgres 접근

psql -h postgres: 입력 후 로그인
\dt: 모든 테이블이 조회된다
control + L: clear와 동일한 기능
dag_run: 해당 테이블에 실행 정보가 저장됨



✍ 회고

- OS 환경을 맥으로 바꿨다. 맥 환경으로 세팅을 다 다시 해 줬는데 그게 꽤 걸렸다.

- Operator들이 많다는 것은 Airflow 책 스터디를 하면서도 느꼈는데 생각보다 더 많은 Operator가 존재하고 이걸 각각 어떻게 활용해야 하는지를 프로젝트를 하면서 느끼게 될 것 같다. 개인적으로 트리거는 특히나 사용할 일이 많을 것 같은데 Explicit Trigger를 쓰는 것이 더 안정적인 프로그램이 될 거라고 생각했다. B가 A의 끝남을 받는 것보다 A가 확실하게 끝난 후에 B를 진행하는 게 프로그램에서 꼬이는 부분이 안 생기지 않을까라는 생각이 들었는데 어떤 경우 Reactive Trigger를 사용했을 때 더 효율적인지 케이스가 궁금해졌다.

profile
송의 개발 LOG

0개의 댓글