Airflow 사용기 (4) - ETL

code_able·2023년 3월 4일
0

오늘은 이기종간의 postgres db의 데이터를
etl 해보자

상황은 A라는 서버의 postgres에서 데이터를 읽어
airflow가 설치된 etl 서버에서 csv파일을 내리고
B라는 postgres서버로 데이터를 옮기는 것이다.

postgres operator 설치

pip install apache-airflow-providers-postgres

dag 작성

from datetime import datetime, timedelta
import logging
from textwrap import dedent
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresHook


LOGGER = logging.getLogger(__name__)

POSTGRES_CONN_ID = "postgres_default"
FILE_PATH = "/temp/dataset.csv"
PG_HOOK = PostgresHook.get_hook(POSTGRES_CONN_ID)
SEPERATOR = "\t"


def pg_extract(sql):
    PG_HOOK.copy_expert(sql, filename=FILE_PATH)
    LOGGER.info("success extract data")

def pg_load(target_table, fields):
    with open(FILE_PATH, "r", encoding="utf-8") as file:
        data = [line.replace("\n", "").split(SEPERATOR) for line in file.readlines()]
        PG_HOOK.insert_rows(
            table=target_table,
            rows=data,
            replace=False,
            target_fields=fields
            )
    LOGGER.info("success load data")
            

with DAG(
    "etl",
    description="etl",
    start_date=datetime(2022, 12, 25),
    schedule=timedelta(days=1),
    tags=["etl"],
) as dag:
    
    dag.doc_md = __doc__
    dag.doc_md = """
        etl test
    """
    
    t1 = PythonOperator(
        task_id="extract",
        python_callable=pg_extract,
        op_kwargs={
            "sql": f"""
            COPY (
                SELECT id
                    , keyword
                    , created_at
                FROM extn.KEYWORD
            ) TO STDOUT WITH
            DELIMITER '{SEPERATOR}'
            """
        }
    )
    
    t1.doc_md = dedent(
        """
        # extract
        """
    )

    t2 = PythonOperator(
        task_id="load",
        python_callable=pg_load,
        op_kwargs={
            "target_table": "extn.keyword2",
            "fields": ["id", "keyword", "created_at"]
        }
    )
    
    t2.doc_md = dedent(
        """
        # load
        """
    )
    
    t1 >> t2
profile
할수 있다! code able

0개의 댓글