[Airflow] task 간 데이터 공유, PostgresOperator 사용하기

Woong·2024년 10월 7일
0

Apache Airflow

목록 보기
4/9

Airflow task 간 데이터 공유

  • airflow 의 task 는 독립적으로 실행
    • 메모리에서 데이터 공유 불가
    • task 간 데이터는 별도로 저장해야 읽을 수 있다
      • XCom 으로 meta store 에서 picklable 직렬화 객체를 디스크에 저장하고 읽는 방식
        • 적은 용량의 object 에 유용
      • DB 나 파일 등 외부 저장소에 저장하는 방법

PostgresOperator

  • 추가 의존성 설치
    pip3 install apache-airflow-providers-postgres
  • DB 접근하기 위해 credential 저장
    • 등록 후 UI 에서 Admin > Connections 에서 확인 가능
airflow connections add \
  --conn-type postgres \
  --conn-host <host> \
  --conn-login <username> \
  --conn-password <password> \ 
  <connection 식별자>
  • PostgresOperator 를 통해 SQL 실행
    • hook 을 통해 Postgres 연결 생성, 쿼리 실행, 연결 종료
    • operator 자체는 요청을 hook 으로 전달하는 것만 수행
    • postgres_conn_id 로 자격증명 지정
    • template_searchpath: 파일을 탐색할 경로를 추가로 지정
      • template_ext 로 지정된 확장자(.sql)가 일치하면 SQL template화
from urllib import request

import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

dag = DAG(
    dag_id="listing_4_20",
    start_date=airflow.utils.dates.days_ago(1),
    schedule_interval="@hourly",
    template_searchpath="/tmp",
    max_active_runs=1,
)


def _get_data(year, month, day, hour, output_path):
    url = (
        "https://dumps.wikimedia.org/other/pageviews/"
        f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    request.urlretrieve(url, output_path)


get_data = PythonOperator(
    task_id="get_data",
    python_callable=_get_data,
    op_kwargs={
        "year": "{{ execution_date.year }}",
        "month": "{{ execution_date.month }}",
        "day": "{{ execution_date.day }}",
        "hour": "{{ execution_date.hour }}",
        "output_path": "/tmp/wikipageviews.gz",
    },
    dag=dag,
)


extract_gz = BashOperator(
    task_id="extract_gz", bash_command="gunzip --force /tmp/wikipageviews.gz", dag=dag
)


def _fetch_pageviews(pagenames, execution_date):
    result = dict.fromkeys(pagenames, 0)
    with open("/tmp/wikipageviews", "r") as f:
        for line in f:
            domain_code, page_title, view_counts, _ = line.split(" ")
            if domain_code == "en" and page_title in pagenames:
                result[page_title] = view_counts

    # PostgreSQL 에 실행할 SQL 을 파일로 작성
    with open("/tmp/postgres_query.sql", "w") as f:
        for pagename, pageviewcount in result.items():
            f.write(
                "INSERT INTO pageview_counts VALUES ("
                f"'{pagename}', {pageviewcount}, '{execution_date}'"
                ");\n"
            )


fetch_pageviews = PythonOperator(
    task_id="fetch_pageviews",
    python_callable=_fetch_pageviews,
    op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}},
    dag=dag,
)

# PostgresOperator 를 통해 SQL 실행. 실행할 SQL 파일명 지정
write_to_postgres = PostgresOperator(
    task_id="write_to_postgres",
    # 자격증명
    postgres_conn_id="my_postgres",
    # 실행할 SQL 파일
    sql="postgres_query.sql",
    dag=dag,
)

get_data >> extract_gz >> fetch_pageviews >> write_to_postgres

reference

0개의 댓글