Airflow.cfg

💡 Windows기반 Docker 사용 시 airflow.cfg 파일 접근, 수정 방법
1. Docker GUI 실행
2. docker-webserver 컨테이너 클릭
3. Files 탭 클릭
4. opt > airflow > airflow.cfg

  • Airflow.cfg 파일에서 DAGs 폴더가 지정되는 위치 : core 섹션의 dags_folder라는 키에 지정된다. (도커 컴포즈 : /opt/airflow/dags)
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /opt/airflow/dags
  • DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가? : 주기적으로 스캔하게 되며 dags 폴더와 동일하게 core 섹션의 scheduler_dag_dir_list_interval 키가 스캔 주기를 결정한다. 300이면 5분, 5분에 한 번씩 "core 섹션의 dags_folder가 가리키는 디렉토리를 스캔한다. 또한 서브 디렉토리까지도" -> 파이썬 파일들을 모두 실행해보면서 수행한다.

  • 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가? : api 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경 -> airflow 로그인할 때 사용했던 정보를 이용해서 api를 외부에서 호출하고 특정 대그를 실행하거나 어떤 대그들이 있는지 리스트화하는 것들이 가능해진다.

  • Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까? : password, secret, passwd, authorization, api_key, apikey, access_token -> 여기서 한 단계 더 나아가면 prefix의 형태로 변수 이름 앞에 특정 시퀀스가 있다면, 변수의 값을 엔크립션해서 저장하는 것까지 되기도 한다.

  • 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은? : 스케줄러와 웹서버를 재시작해야함. (도커 컴포즈를 사용하는 환경이라면, airflow.cfg의 내용이 보관이 되지 않기 때문에 원래 값으로 돌아가게 된다. docker-compose.yaml 내부에 변경하고 싶은 내용을 지정해야 변경이 유지된다. compose down -> up 하면 변경사항이 유지된다.) Docker라면 docker-compose.yaml 파일이 변경되고 docker compose down과 docker compose up을 차례로 실행

  • Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가? : fernet_key 값을 세팅해주면 암호화되어 저장된다. (airflow는 자기 정보를 postgres에 적는다. 이를 보통 메타 데이터 DB라고 부른다. 이 내용이 외부에 노출되면 보안상 이슈 발생) 설정 안하면 plain text로 저장된다. 단, fernet key를 잃어버리면 DB 세팅 불가

확인

# 웹서버로 진입
docker exec -it airflow-setup-airflow-webserver-1 sh

# 현재 경로 확인
pwd
'''/opt/airflow'''

# 서브 폴더 및 파일 보기
ls -tl
'''airflow.cfg 파일이 있는 것을 확인할 수 있다.'''

# airflow.cfg 확인
cat airflow.cfg | grep dags

업데이트되는 디렉토리는 /opt/airflow/dags이다. dags 폴더 내부로 진입해본다. 또한, 이 디렉토리는 로컬 컴퓨터의 특정 폴더와 매칭(싱크)이 되어있다. 로컬 컴퓨터의 터미널에서 수정을 하면 바로 반영이 된다.

그러나, airflow.cfg는 바로 반영되도록 설정되어 있지 않은 상태이다. 따라서, cd .. 을 통해 dags 폴더를 나오고 이곳에 있는 airflow.cfg는 수정하고 나가서 도커 컴포즈를 다운했다가 업한다고 해서 변경 내용이 반영되지 않는다.

따라서, exit을 통해 host system으로 돌아오고 docker-compose.yaml 파일을 열어보면 environment 부분을 볼 수 있다.

AIRFLOW -> airflow.cfg의 내용을 덮어쓰라는 의미
AIRFLOW
CORE__FERNET_KEY : ' ' -> aiflow.cfg의 core 섹션에 fernet key의 값을 ''로 세팅하라. 값이 없으므로 암호화를 안 한 것
따라서, docker-compose로 airflow의 내용을 변경하고 싶을 때 yaml 파일을 변경하면 된다.


docker-compose를 통해 실행한 airflow는 api가 이미 활성화되어 있다. 따라서, airflow와 관계된 사용자 ip, pd만 알면 api를 호출할 수 있다.

즉, airflow.cfg의 내용을 docker compose로 실행하는 airflow의 경우 airflow.cfg의 내용을 직접 건드리는 것이 아니라 docker-compose.yaml 파일의 environment 밑에 키 입력 패턴을 따라 작성해주면 된다.
AIRFLOW(언더바 2개)섹션이름(언더바 2개)키이름: 키의 값



세계 나라 정보 API 사용 DAG 작성


Open Weathermap DAG로 서울의 날씨 정보를 읽어오는 DAG 구현하기

Open Weathermap

  • 위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스

  • 무료 계정으로 api key를 받아서 이를 호출시에 사용

  • https://openweathermap.org/price

  • API Key를 open_weather_api_key라는 Variable로 저장


    변수명에 api_key라는 단어가 포함되었기 때문에 암호화된 것처럼 보이지만, 실제로는 fernet key 설정을 안 했기 때문에 내용 자체는 암호화되지 않은 상태로 DB에 저장됨

  • 서울의 위도와 경도를 찾을 것

    • One-Call API를 사용: https://openweathermap.org/api/one-call-api
      • 앞서 API KEY와 서울의 위도/경도를 사용해서 위의 API를 requests 모듈을 사용해서 호출
      • 응답 결과에서 온도 정보(평균/최소/최대)만 앞으로 7일을 대상으로 출력해볼 것 : 날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max)
# API 엔드포인트 형태
https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={API
key}&units=metric

{lat} : 위도
{lon} : 경도
units=metric : 섭씨 사용

API 호출에 대한 응답 형태

일별, 시간별, 분별 날씨 정보 등, dt는 리눅스에서 시간을 나타내는 방식. 1970년 1월 1일 이후로 몇 초가 지났는지 나타내준다. dt는 어떤 날인지. 그 날 태양, 달이 언제 뜨고 지는지. 달의 모양이 어떤지, temp아래에 그 날의 온도 정보. 유료 api를 사용하면 과거, 8일 이후 등 더 광범위한 날씨 정보를 불러올 수 있다.

  • Open Weathermap의 one call API(api 엔드포인트)를 사용해서 서울의 다음 8일간의 낮/최소/최대 온도를 읽어다가 각자 스키마 밑의 weather_forecast라는 테이블로 저장
    • https://openweathermap.org/api/one-call-api를 호출해서 테이블을 채움
    • weather_forecast라는 테이블이 대상이 됨 (여기서 유의할 점은 created_date은 레코드 생성시간으로 자동 채워지는 필드라는 점)
CREATE TABLE keeyong.weather_forecast (
 date date primary key,
 temp float, -- 낮 온도
 min_temp float,
 max_temp float,
 created_date timestamp default GETDATE()
);

GETDATE()는 레코드가 생성될 때 시간을 넣는다는 것
해당 DAG를 full refresh로 생성한 뒤에 incremental update로 변경해볼 것이다. GETDATE 정보는 full refresh에서는 쓰이지 않지만 incremental update시 사용된다.

  • One-Call API는 결과를 JSON 형태로 리턴해줌

    • 이를 읽어들이려면 requests.get 결과의 text를 JSON으로 변환해 주어야함
    • 아니면 requests.get 결과 오브젝트가 제공해주는 .json()이란 함수 사용
      • f = requests.get(link)
        • f_js = f.json()
  • 결과 JSON에서 daily라는 필드에 앞으로 8일간 날씨 정보가 들어감 있음

    • daily 필드는 리스트이며 각 레코드가 하나의 날짜에 해당
    • 날짜 정보는 “dt”라는 필드에 들어 있음. 이는 epoch이라고 해서 1970년 1월 1일 이후 밀리세컨드로 시간을 표시. 이는 아래와 같은 코드로 읽을 수 있는 날짜로 변경 가능
      • datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # 2021-10-09

epoch로 주어진 값이 d["dt"] 와 같은 딕셔너리 아이템에 있다면 사람이 읽을 수 있는 형태로 변환하고 싶을 때 위의 코드를 사용한다.

  • Airflow Connections를 통해 만들어진 Redshift connection

    • 기본 autocommit의 값은 False인 점을 유의
  • 두 가지 방식의 Full Refresh 구현 방식

    • Full Refresh와 INSERT INTO를 사용
    • Full Refresh와 COPY를 사용 -> 나중에 사용해볼 예정
  • API Key는 어디에 저장해야할까? -> airflow > variables > 키 생성

  • Full Refresh : 매번 테이블을 지우고 다시 빌드

  • DW상의 테이블은 아래처럼 정의

CREATE TABLE keeyong.weather_forecast (
 date date primary key,
 temp float, -- 낮 온도
 min_temp float,
 max_temp float,
 created_date timestamp default GETDATE()
);

full refresh

코드 출처

from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging
import json


def get_Redshift_connection():
    # autocommit is False by default
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()

@task
def etl(schema, table):
    api_key = Variable.get("open_weather_api_key") #airflow variables에 저장되었던 키를 읽어와서 
    # 서울의 위도/경도 설정 (하드코딩)
    lat = 37.5665
    lon = 126.9780

    # https://openweathermap.org/api/one-call-api ->API URL
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts" #daily 정보만 필요하기 때문에 안 쓰는 정보들은 exclude로 제외
    response = requests.get(url)
    data = json.loads(response.text) #response.json()과 동일
    """
    {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
    """
    ret = []
    for d in data["daily"]: #data["daily"]에 8일치의 날씨 정보가 들어있다.
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    cur = get_Redshift_connection()
    drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table}; #full refresh 형태
CREATE TABLE {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);
"""
    insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret) #정보 insert
    logging.info(drop_recreate_sql)
    logging.info(insert_sql)
    try:
        cur.execute(drop_recreate_sql)
        cur.execute(insert_sql)
        cur.execute("Commit;")
    except Exception as e:
        cur.execute("Rollback;")
        raise

with DAG(
    dag_id = 'Weather_to_Redshift',
    start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:

    etl("keeyong", "weather_forecast")

task가 한 개, task명은 etl이다.
하드코딩된 서울의 위도, 경도 부분을 etl 함수의 파라미터로 전달하는 방식으로 수정 추천


incremental update

오늘 DAG를 실행하면 내일부터 8일간의 레코드를 불러온다. 내일 또 실행하면 다음 날부터 8일간의 레코드를 불러온다. -> 7일 정보가 중복될 것이다. 겹치는 날짜는 언제 불러온 것을 우선시 할 것인가? 일기예보는 미래의 날짜일수록 더 정확할 것이다. 따라서, 우선 순위를 정할 때 created_date timestamp default GETDATE() 정보를 활용한다.
-> ROW_NUMBER 문법 사용


Primary Key Uniqueness 보장

보통 데이터 웨어하우스가 지정해주는 업데이트 방법이 있다.(Upsert) 이것을 사용하는 것이 가장 좋지만, 직접 한다면 ROW_NUMBER 사용

Primary Key Uniqueness란

  • 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드(들)
    • 하나의 필드가 일반적이지만 다수의 필드를 사용할 수도 있음 (컬럼 조합, compose key라고 칭함)
    • 이를 CREATE TABLE 사용시 지정
  • 관계형 데이터베이스 시스템이 Primary key의 값이 중복 존재하는 것을 막아줌
    • 예 1) Users 테이블에서 email 필드
    • 예 2) Products 테이블에서 product_id 필드
CREATE TABLE products (
 product_id INT PRIMARY KEY,
 name VARCHAR(50),
 price decimal(7, 2)
);

CREATE TABLE orders (
 order_id INT,
 product_id INT,
 PRIMARY KEY (order_id, product_id),
 FOREIGN KEY (product_id) REFERENCES products (product_id)
);

데이터 웨어하우스가 프라이머리 키 유니크를 보장하지 않는다. 기본키 컬럼이 유니크함을 보장하는 것은 개발자의 책임.

빅데이터 기반 데이터 웨어하우스들은 Primary Key를 지켜주지 않음

  • Primary key를 기준으로 유일성 보장을 해주지 않음 : 이를 보장하는 것은 데이터 인력의 책임
  • Primary key 유일성을 보장해주지 않는 이유는? : 보장하는데 메모리와 시간이 더 들기 때문에 대용량 데이터의 적재가 걸림돌이 됨
CREATE TABLE keeyong.test (
 date date primary key,
 value bigint
);
Primary Key Uniqueness 보장하기
INSERT INTO keeyong.test VALUES ('2023-05-10', 100);
INSERT INTO keeyong.test VALUES ('2023-05-10', 150); -- 이 작업이
성공함! 

Primary Key 유지 방법

  • 앞서 살펴본 keeyong.weather_forecast 테이블을 대상으로 살펴보자
CREATE TABLE keeyong.weather_forecast (
 date date primary key,
 temp float,
 min_temp float,
 max_temp float,
 created_date timestamp default GETDATE()
);

Primary Key Uniqueness 보장하기

  • 날씨 정보이기 때문에 최근 정보가 더 신뢰할 수 있음.
  • 그래서 어느 정보가 더 최근 정보인지 created_date 필드에 기록하고 이를 활용
  • 즉 date이 같은 레코드들이 있다면 created_date을 기준으로 더 최근 정보를 선택-> 이를 하는데 적합한 SQL 문법이 ROW_NUMBER

💡 Primary Key 보장 방법

  • 임시 테이블(스테이징 테이블)을 만들고 거기로 현재 모든 레코드를 복사
  • 임시 테이블에 새로 데이터소스에서 읽어들인 레코드들을 복사 -> 이 때 중복 존재 가능
  • 중복을 걸러주는 SQL 작성:
    • 최신 레코드를 우선 순위로 선택
    • ROW_NUMBER를 이용해서 primary key로 partition을 잡고 적당한 다른 필드(보통 타임스탬프 필드)로 ordering(역순 DESC)을 수행해 primary key별로 하나의 레코드를 잡아냄
  • 위의 SQL을 바탕으로 최종 원본 테이블로 복사
    • 이때 원본 테이블에서 레코드들을 삭제
    • 임시 temp 테이블을 원본 테이블로 복사 (일련번호가 1번인 것들만 선택)

💡 Primary Key 보장 방법 코드

# 1. 원래 테이블의 내용을 임시 테이블 t로 복사
CREATE TEMP TABLE t AS SELECT * FROM keeyong.weather_forecast;
# 2. DAG는 임시 테이블(스테이징 테이블)에 레코드를 추가. 이때 중복 데이터가 들어갈 수 있음
# 3. 원본 테이블 내용 삭제
DELETE FROM keeyong.weather_forecast;
# 4. 중복을 없앤 형태로 새로운 테이블 생성
INSERT INTO keeyong.weather_forecast
SELECT date, temp, min_temp, max_temp, created_date
FROM (
 SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
 FROM t
)
WHERE seq = 1; # 일련번호가 1인 레코드들만 원본 테이블로 복사한다. 

위의 코드는 매번 새로 덮어쓰는 형식의 업데이트를 가정한다.
매일 날짜가 하나씩 늘어가는 형태로 레코드가 유지될 것이다.
3번과 4번 과정은 Transaction으로 묶여야 한다. 그렇지 않으면 delete하고 커밋된 것을 사용자가 엑세스하면 내용이 없을 수 있는 등 문제 발생 (autocommit 이 True인 경우)
(autocommit이 false인 경우도) delete와 insert가 끝난 후 커밋을 해야 한다. delete만 하고 커밋하면 다른 사용자가 접근했을 때 레코드가 아무것도 보이지 않게 된다. insert 하다가 에러가 발생해도 delete된 상태가 이미 커밋되었기 때문에 문제가 있을 것이다. autocommit이 True, False에 상관없이 3번과 4번은 트랜잭션으로 묶여야 한다.

Weather_Forecast DAG를 Incremental Update로 구현

코드 출처

from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook

from datetime import datetime
from datetime import timedelta

import requests
import logging
import json


def get_Redshift_connection():
    # autocommit is False by default
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()


@task
def etl(schema, table, lat, lon, api_key):

    # https://openweathermap.org/api/one-call-api
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text)

    """
    {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
    """
    ret = [] #API 호출 결과 저장
    for d in data["daily"]:
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    cur = get_Redshift_connection()
    
    # 원본 테이블이 없다면 생성
    create_table_sql = f"""CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);"""
    logging.info(create_table_sql)

    # 임시 테이블 생성
    create_t_sql = f"""CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};""" #TEMP 테이블 생성 (원본 테이블의 내용을 읽어와서 CTAS 함)
    logging.info(create_t_sql)
    try:
        cur.execute(create_table_sql)
        cur.execute(create_t_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise

    # 임시 테이블 데이터 입력
    insert_sql = f"INSERT INTO t VALUES " + ",".join(ret) #8일치 날씨 데이터가 저장된 ret리스트를 temp테이블에 insert 한다. 
    logging.info(insert_sql)
    try:
        cur.execute(insert_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise

    # 기존 테이블 대체
    alter_sql = f"""DELETE FROM {schema}.{table}; #원본 테이블의 레코드를 다 날리고
      INSERT INTO {schema}.{table} #원본 테이블에 temp 테이블의 내용을 insert한다.
      SELECT date, temp, min_temp, max_temp FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
        FROM t
      )
      WHERE seq = 1;""" #일련번호가 1번인 것만 읽어서, 날짜 낮 온도, 최저 최고 온도 저장 -> primary key uniqueness를 직접 보장하는 방법 
    logging.info(alter_sql)
    try:
        cur.execute(alter_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise


with DAG(
    dag_id = 'Weather_to_Redshift_v2',
    start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 4 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:

    etl("keeyong", "weather_forecast_v2", 37.5665, 126.9780, Variable.get("open_weather_api_key"))

ROW_NUMBER로 일련번호가 1번인 것만 읽어오는 Primary Key Uniqueness를 직접 보장하는 방법이 비효율적으로 보일 수 있지만, 이것을 효율적으로 처리할 수 있게 만들어진 것이 데이터 웨엏우스이다. 각 데이터 웨어하우스에서 지정하는 방식(Upsert)은 6주차에 진행


Upsert

  • insert + update
  • Primary Key를 기준으로 존재하는 레코드라면 새 정보로 수정 (update)
  • 존재하지 않는 레코드라면 새 레코드로 적재 (insert)
  • 보통 데이터 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원해줌
    - 뒤에서 MySQL to Redshift DAG를 구현할 때 살펴볼 예정

    직접 구현했던 PK Uniqueness 보장 방법 중 temp 테이블에 레코드를 적재할 때, ideal하게는 받아온 날짜 정보를 저장하는 ret에 8개의 날짜를 weather_forecast 테이블에 바로 적재를 하면서 해당 PK가 존재하는 레코드라면 새로운 정보로 덮어쓰고 PK가 존재하지 않는 레코드라면 새로 레코드를 만드는 것이 제일 편할 것이다. (파티션 등의 과정이 필요없어지기 때문)
    -> 이것이 Upsert 이다.

Redshift는 copy sql을 쓸 때 지원해준다.
Bigquery, Snowflake 등 빅데이터 기반 데이터 웨어하우스는 PK Uniqueness를 보장하지 않지만, 전부 Upsert 문법이 존재한다.
Upsert는 레코드 단위로 동작하지 않고 bulk단위로 동작한다. 즉, 레코드 단위로 insert 될 때 동작하지 않는다. S3와 같은 클라우드 데이터 스토리지에 적재하고 싶은 레코드를 파일 형태로 로딩해놓고 파일에서 한큐에 테이블로 bulk insert를 하게 되는데 그때 copy라는 것이 있고, copy를 쓸 때 Upsert를 쓸지 말지 결정할 수 있다.


Backfill과 Airflow

데이터의 크기가 작다면 오류가 발생한 지점으로 돌아가서 다시 실행할 필요가 없다. 오늘 실행한 것이 오류가 없었다면 (오늘 실행이 이전의 오류가 발생했던 부분까지 포함하고 있기 때문에)
그러나 incremental update의 경우, 특히 시간 단위로 업데이트하는 경우, 과거의 실패가 구멍이 되어버린다.

Backfill에 일관된 방법이 없다면, 코드를 이해하고 특정 날짜의 레코드를 재실행하는 것이 불가능하게 된다. 이러 부분을 일관되게 만들어주는 것이 Airflow가 가진 큰 강점
start_date과 execution_date을 이해해야 한다.

이제부터 할 이야기는 Incremental Update시에만 의미가 있음

  • 다시 한번 가능하면 Full Refresh를 사용하는 것이 좋음 -> 문제가 생겨도 다시 실행하면 됨
  • Incremental Update는 효율성이 더 좋을 수 있지만 운영/유지보수의 난이도가 올라감
    • 실수등으로 데이터가 빠지는 일이 생길 수 있음
    • 과거 데이터를 다시 다 읽어와야하는 경우 다시 모두 재실행을 해주어야함

Backfill의 용이성 여부 -> 데이터 엔지니어 삶에 직접적인 영향!

  • Backfill의 정의 : 실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터들의 문제로 다시 다 읽어와야하는 경우를 의미
  • Backfill 해결은 Incremental Update에서 복잡해짐
    • Full Refresh에서는 간단. 그냥 다시 실행하면 됨
  • 즉 실패한 데이터 파이프라인의 재실행이 얼마나 용이한 구조인가? -> 이게 잘 디자인된 것이 바로 Airflow

보통 Daily DAG를 작성한다고 하면 어떻게 할까?

● 지금 시간을 기준으로 어제 날짜를 계산하고 그 날짜에 해당하는 데이터를 읽어옴

from datetime import datetime, timedelta

# 지금 시간 기준으로 어제 날짜를 계산
y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d') #어제 날짜 epoch를 날짜 포맷으로 변경

# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

그런데 지난 1년치 데이터를 Backfill 해야한다면?

위에서 본 코드는 어제의 레코드를 읽어올 뿐 1년치의 데이터를 읽어오지는 못한다.

  • 기존 ETL 코드를 조금 수정해서 지난 1년치 데이터에 대해 돌린다
  • 실수하기 쉽고 수정하는데 시간이 걸림
from datetime import datetime, timedelta

~~y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d')~~

yesterday = '2023-01-01' # 이 부분을 365번 바꿔서 실행하던지 루프를 돌리는
걸로 변경

# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

여기에서는 하드코딩으로 고쳐서 돌렸지만 실제 현업에서는 코드가 훨씬 복잡하기 때문에 위와 같은 방법을 적용하기 어렵다. 상당한 시간을 써서 봐야 이해가 된다. 개발한 사람이 없거나 바쁠때, 내가 온콜인데 밤에 상황이 발생하면 시간소모

어떻게 ETL을 구현해놓으면 이런 일이 편해질까?

  • 시스템적으로 이걸 쉽게 해주는 방법을 구현한다
    - DAG별로 날짜시간별로 실행 결과를 기록하고 성공 여부 기록: 나중에 결과를 쉽게 확인
    - 이 날짜를 시스템에서 ETL의 인자로 제공
    - 데이터 엔지니어는 읽어와야하는 데이터의 날짜를 계산하지 않고 시스템이 지정해준 날짜 사용

    성공실패 기록 시 그 전 날 혹은 그 전 시간을 기록한다.
    코드를 바꾸어 재실행하는 것이 아니라, 실패한 시점에 가서 클리어를 해주면, 그 때 그 날짜 혹은 시간을 이용해 재실행된다.
    -> 개발자가 읽어와야 하는 날짜를 계산하는 것이 아니다. 모든 데이터 파이프라인에 스케줄이 있다. 스케줄을 보면 한 시간에 한 번 도는 것인지 하루에 한 번 도는 것인지 안다. 하루에 한 번이라면 그 전날, 한 시간에 한 번이라면 그 전 시간의 날짜와 시간을 알고 있기 때문에 이것을 변수로 넘겨줄 수 있다. 이것이 excution_date

정리하면, 시간을 따지는 것이 아니라 내가 만든 DAG의 실행주기를 Airflow가 이미 알고 있기 때문에 하루에 한 번 실행하는 DAG라면 전날 날짜를 주고, 한 시간에 한 번 도는 것이라면 전 시간의 날짜와 시간을 주도록 Airflow를 만든다.

# 지금 시간 기준으로 어제 날짜를 계산

# 기존 코드
y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d') #어제 날짜 epoch를 날짜 포맷으로 변경

# 개선 코드 (python operator의 경우)
yesterday = context["execution_date”]

execution_date로 받아오기 때문에 코드를 단순화할 수 있다.
Airflow에서는 기존 코드 부분이 개선 코드로 바뀐다.

Airflow의 접근방식

  • ETL 별로 실행 날짜/시간과 결과를 메타데이터 데이터베이스에 기록 : Airflow는 full refresh 하는지 incremental update 하는지 모른다. 기본적으로 incremental update를 한다고 가정하고 실행 날짜, 시간, 결과를 메타 DB에 기록한다.
  • 모든 DAG 실행에는 “execution_date”이 존재하며 이게 바로 읽어와야할 데이터의 날짜와 시간
    • execution_date으로 채워야하는 날짜와 시간이 넘어옴
  • 이를 바탕으로 데이터를 갱신하도록 코드를 작성해야함
  • 이점: backfill이 쉬워짐

Airflow: 보통 Daily DAG를 작성한다고 하면 어떻게 할까?

운영과 Backfill이 동일한 코드로 가능해진다. 그것이 execution_date. 이것이 가능한 이유는 Airflow가 DAG의 실행주기를 알고, 성공 실패 여부를 알기 때문이다. 이런 정보들이 메타 데이터 디비에 기록이 된다.

from datetime import datetime, timedelta
yesterday = context["execution_date”]
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

start_date: Daily Incremental Update를 구현해야 한다면?

start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 4 * * *',  # 적당히 조절

2022년 8월 24일 4시 0분에 DAG가 처음 실행될 것으로 생각되지만, 실제로는 2022년 8월 25일 오전 4시 0분에 처음 실행된다. start_date는 DAG가 incremental update를 한다는 가정 하에 처음 읽어와야 하는 데이터의 날짜이다.
즉, 2022년 8월 24일 데이터를 읽어오려면 그 다음날인 2022년 8월 25일에 가능하다.

start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '30 * * * *',  # 적당히 조절

1시간에 1번씩 4시 30분에 실행되는 DAG(hourly) 2022년 8월 24일 오전 0시 30분에 처음 실행되는 것이 아니라 이 날의 데이터를 읽어오는 것이므로, 2022년 8월 24일 오전 1시 30분에 DAG가 처음 실행될 것이다.

예를 들어 2020년 11월 7일의 데이터부터 매일매일 하루치 데이터를 읽어온다고 가정한다.

  • 이 경우 언제부터 해당 ETL이 동작해야하나? -> 2020년 11월 8일

  • 다르게 이야기하면 2020년 11월 8일날 동작하지만 읽어와야 하는 데이터의 날짜는? -> 2020년 11월 7일: 이것이 start_date이 된다.

  • Airflow의 start_date은 시작 날짜라기는 보다는 DAG가 처음 읽어와야하는 데이터의 날짜

  • execution_date은 읽어와야하는 데이터의 날짜로 설정된다.

Airflow의 start_date는

  • DAG 관점 X
  • DATA 관점 O
    DAG가 처음 실행되기를 바라는 날짜와 시간이 아니라, 처음 읽어오고 싶은 데이터의 날짜와 시간이다.

start_date와 excution_date 이해하기

  • 2020-08-10 02:00:00로 start date 설정된 daily job이 있다. -> catchup이 True로 설정되어 있다고 가정 (디폴트가 True)
  • 지금 시간이 2020-08-13 20:00:00이고 처음으로 이 job이 활성화되었다. (start_date가 지났더라도 Airflow에서 활성화하지 않으면 DAG는 실행되지 않는다.)

Q : 이 경우 이 job은 몇번 실행될까? (execution_date)
1. 2020-08-10 02:00:00
2. 2020-08-11 02:00:00
3. 2020-08-12 02:00:00
4. 2020-08-13 02:00:00

2, 3, 4번이 실행된다.
2020-08-10의 데이터를 읽어오기 위해 이 날짜를 execution_date로 설정한다.
즉, 08-10(execution_date)을 읽기 위해 08-11 02:00에 실행되고, 08-11(execution_date)을 읽기 위해 08-12 02:00에 실행되고, 08-12(execution_date)를 읽기 위해 08-13 02:00에 실행된다.

각각 그 전날의 날짜와 시간이 execution_date로 들어가게 된다.



위의 사례는 @once를 통해 필요 시 한 번만 실행하면 되었으나, 습관적으로 날짜를 부여해서 daily로 반복되게 했고, 매 실행마다 돈이 부과되었다.
또 다른 문제점은 catchup 파라미터의 몰랐고, start_date와의 관계를 몰랐기 때문이다.

8월 6일을 start_date로 설정했다.
catchup은 디폴트가 True이다. True이면 Airflow에서 DAG를 활성화했을 때, start_date부터 지금까지 실행이 되지 않았던 회차가 연달아 실행된다.
따라서, 한 번 돌면 2000불인 쿼리가 8번 돌게 되었다.
쓴 만큼 돈이 부과되는 웨어하우스의 최대 단점. redshift 내가 산 용량 내에서 똑같은 비용을 낸다. 즉, scalable한 시스템이 꼭 좋은 것만은 아니다.

snowflake, bigquery를 사용하는 팀에게 매일 오전 혹은 전날밤에 sql을 비용 순으로 sorting해서 10개를 전체 팀에 뿌리게 했다. 경각심을 갖도록. 탓하기 위함이 아니라 어떻게 최적화할 수 있는지 세션을 연다. 비용에 대해 고민하도록. limit을 걸수도 있고, sql explain을 통해 비용을 미리 산정해볼 수 있다. by. Max

Backfill과 관련된 Airflow 변수들

  • start_date : DAG가 처음 실행되는 날짜가 아니라 DAG가 처음 읽어와야하는 데이터의 날짜/시간. 실제 첫 실행날짜는 start_date + DAG의 실행주기 (하루 한 번 실행되는 DAG라면 첫 실행날짜는 start_date+1 = 다음날이 될 것이다. 1시간에 한 번 실행되는 DAG라면 start_date+1시간)
  • execution_date : DAG가 읽어와야하는 데이터의 날짜와 시간
  • catchup : DAG가 처음 활성화된 시점이 start_date보다 미래라면 그 사이에 실행이 안 된 것들을 어떻게 할 것인지 결정해주는 파라미터. True가 디폴트값이고 이 경우 실행 안 된 것들을 모두 따라잡으려고 함. False가 되면 실행안된 것들을 무시함 (사고 발생 위험있으므로 주의해서 설정) -> Airflow 다운해도 업하면 다시 실행된다.
  • end_date : 이 값은 보통 필요하지 않으며 Backfill을 날짜 범위에 대해 하는 경우에만 필요
    - airflow dags backfill -s (start_date), -e(end_date) (DAG ID) -> 지난 1년치 데이터를 backfill해야 하는 경우 Airflow Command Line에서 다음과 같이 입력한다.
    - 만약 Daily Dag라면 execution_date를 하루씩 바꿔가면서 불러줄 것이다.

숙제

  1. 퀴즈
  • Airflow에서 하나의 DAG는 다수의 ()로 구성된다. ()에 들어갈 말은?

  • 매일 동작하는 DAG의 Start date이 2021-02-05라면 이 DAG의 첫 실행 날짜는?

    • 2021-02-05
    • 2021-02-06
    • 2021-02-05 01:00:00
    • 2021-02-07
  • 위 DAG의 경우 이때 execution_date으로 들어오는 날짜는?

  • Schedule interval이 "30 * * * *"으로 설정된 DAG에 대한 올바른 설명은?

    • 매일 0시 30분마다 한번씩 실행된다.
    • 매시 30분마다 한번씩 실행된다.
    • 일요일마다 매시 30분에 한번씩 실행된다.
  • Schedule interval이 "0 * * * *"으로 설정된 DAG의 start date이 "2021-02-04 00:00:00"으로 잡혀있다면 이 DAG의 첫 번째 실행 날짜와 시간은 언제인가?

  • Airflow의 DAG가 처음 ON이 되었을 때 start_date과 현재 날짜 사이에 실행이 안된 run들이 있을 경우 이를 실행한다. 이는 (??) 파라미터에 의해 결정된다. 이 파라미터를 False로 세팅하면 과거 실행이 안된 run을 무시한다.

    • execution_date
    • dag
    • catchup
  • 다음 중 Redshift에서 큰 데이터를 테이블로 복사하는 방식을 제대로 설명한 것은?
    - 하나씩 INSERT INTO를 실행하여 복사해준다.
    - 복사할 레코드들을 파일로 저장해서 한번 Redshift로 올린다.
    - 복사할 레코드들을 파일로 저장해서 S3로 올린 후에 거기서 Redshift로 벌크 복사한다.

  1. UpdateSymbol_v2의 Incremental Update 방식 수정해보기
    애플 주식 정보 읽어오는 DAG에서 중복처리를 DISTINCT로 했다. 한 날짜에 시간을 다르게 해서 미국 주식 시장이 열려있는 중에 호출하면 종가, 최저가, 최고가가 언제 불렀느냐에 따라 달라지기에 불완전한 중복 제거 방법이다. 같은 날짜에 대해서 다양한 수의 중복이 존재할 수 있다. 애플 주식임에도. 즉, 이 내용을 앞서 배운 ROW_NUMBER 방식을 사용해서 Primary key가 동일한 레코드들을 처리하기

ROW_NUMBER 사용을 위해 테이블에 아래 필드 추가

created_date timestamp default GETDATE()


Q&A

  • 그럼 만약 start_date가 2023.07.01이라고 했을 때 첫 데이터의 execution_date도 2023.07.01이고, 실제 dag가 실행되는 시간은 2023.07.02인 거죠? Daily 실행을 전제로 했을 때 -> YES

  • dag_dir_list_interval 이 값으로 인해 300초마다 dags들을 스캔하면서 실제 수행도 된다고 하셨는데, 최초로 올라갈 때만 실행을 하는건가요? -> NO. 무조건 실행이다.

    300초마다 코드들을 다 돌려보는 것. 코드를 돌려보면 무엇이 없어졌고, 무엇이 생겼는지 알 수 있다. 그런데 레코드를 삭제하는 파이썬 파일 등을 간편하게 하려고 작성하면 에어플로우가 DAG가 있는지 확인하려고
    에어플로우 관점에서 제대로 작성된 DAG란 DAG가 실행되는 것이 아니라 어떤 대그가 있고 테스크가 있는지 파악하고 끝나는 것이다. 그런데 일반 파이썬 파일(ex:레코드 삭제)이 있으면 실행하게 된다. 이를 막고 싶으면 .airflowignore를 만들어서 test로 시작하는 모든 파이썬 파일을 실행하지 말라 와 같은 조건을 설정하거나

    특정 파일 이름을 지정할 수 있고, 그냥 test를 쓰면 test를 포함하는 스크립트는 실행되지 않을 것이다.
    즉, DAG를 수행한다는 것이 아니라 파이썬 스크립트의 메인 함수가 실행된다는 것

  • Elasticsearch 같은 NoSql을 DW로 사용하는 경우도 있나요? -> 못 봄. 왜냐하면, 쿼리를 날려 데이터 분석을 하거나 하기는 어렵다. 문법도 다르고, 데분이 쉽게 쓸 수 없다. 그러나 엘라스틱서치 자체는 굉장히 강력한 툴이다.

profile
Data Analyst / Engineer

4개의 댓글

comment-user-thumbnail
2023년 7월 18일

글 잘 봤습니다, 감사합니다.

1개의 답글
comment-user-thumbnail
2023년 7월 18일

너무 좋은 글이네요. 공유해주셔서 감사합니다.

답글 달기
comment-user-thumbnail
2023년 7월 18일

뛰어난 글이네요, 감사합니다.

답글 달기