autocommit 의미
부분적으로 성공하는 것이 없도록 하는 것이 포인트!
autocommit이 true일 경우
delete from vs truncate
트랜잭션이란?
트랜잭션은 다수의 sql을 묶어서 하나처럼 사용하는 것임
rollback 안 해도 마치 rollback한 것 같은 효과
트랜잭션 구현방법
autocommit 이 true이든, false이든 python try/except로 컨트롤 가능하다
Log 버튼을 누르면 실행한 task의 로그가 출력됨
airflow dags list
airflow tasks list [dag이름]
airflow tasks test [DAG이름] [Task이름] [날짜]
test : 아무데도 기록이 남지 않음
run : 메타db에 기록이 남음
airflow dags **test** [DAG이름] [날짜]
예시1) airflow dags test MySQL_to_Redshift_v3 2019-12-08
예시2) airflow dags backfill MySQL_to_Redshift_v3 -s 2019-01-01 -e 2019-12-31
dag선언시 추가로 넣을 수 있는 파라미터 목록
이 값들을 파라미터로 추가할 수 있는데, 만약 추가하지 않을 경우 airflow.cfg파일에 선언된 값을 가져와서 사용함
redshift 연결정보 변수를 의미함
안전하게 관리하기 위해 코드와 분리시킨다
Connections
Variables
Variable
Connections
지금까지는 간단한 dag였고,, 현장에서는 api로 데이터 불러오고, db에 저장하는 것이 훨씬 일반적이다.
무료버전의 날씨 api를 사용해서 데이터를 받아오고, dw에 적재한다
upsert = intert + update
대부분의 DW는 upsert를 지원하지 않음, pk를 보장하지 않기 때문에 upsert를 지원하지 않음
즉 upsert사용 = pk 유일성을 보장해준다는 의미임
window함수의 row_number를 사용해서 pk처럼 구별한다.
CREATE TABLE keeyong.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
예를 들어, 이 테이블에서 pk처럼 유일한 식별자 or upsert를 찾을 경우
1) date 컬럼을 기준으로 partition처리(같은 date끼리 group by하듯이 묶음)
2) 그리고 row_number를 사용하여 일련번호 붙이는데, 이럴 경우 timestamp인 created_date컬럼을 기준으로 가장 최근의 데이터를 1번부터 붙임
3) 그래서 생성된 일련번호 1인 것들만 가져와서 새로운 테이블을 만듦
새 테이블 생성 - CTAS
CREATE TABLE keeyong.temp_weather_forecast AS SELECT * FROM keeyong.weather_forecast;
DAG는 임시 테이블(스테이징 테이블)에 레코드를 추가
원본 테이블 삭제 DELETE FROM keeyong.weather_forecast;
최신데이터만 골라서 기존 테이블에 적재
INSERT INTO keeyong.weather_forecast
SELECT *,
ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM keeyong.temp_weather_forecast
WHERE seq = 1;
위의 코드는 매번 새로 덮어쓰는 형식의 업데이트를 가정
admin > connections
admin > variables
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
from datetime import timedelta
import time
import requests
import logging
import psycopg2
import pprint
def get_Redshift_connection():
hook = PostgresHook(postgres_conn_id='redshift_dev_db') # connections에 미리 등록해놓음
return hook.get_conn().cursor()
def extract(**context):
base_url = context["params"]["base_url"]
lat = context["params"]["lat"]
lon = context["params"]["lon"]
part = context["params"]["part"]
api_key = context["params"]["api_key"]
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
link = base_url.format(lat=lat, lon=lon, part=part, api_key=api_key)
res = requests.get(link)
result = res.json()
return result
def transform(**context):
lines = []
data = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
for day in data["daily"]:
# pprint(day) # 하루 데이터
# print(time.strftime("%Y/%m/%d", time.localtime(day['dt']))) # 날짜
# pprint(day['temp']['day']) # 낮 온도
# pprint(day['temp']['min']) # 최저 온도
# pprint(day['temp']['max']) # 최고 온도
date = str(time.strftime("%Y/%m/%d", time.localtime(day['dt'])))
temp = str(day['temp']['day'])
min_temp = str(day['temp']['min'])
max_temp = str(day['temp']['max'])
lines.append(','.join([date, temp, min_temp, max_temp]))
return lines
def load(**context):
schema = context["params"]["owa_schema"]
table = context["params"]["owa_table"]
ddl = context["params"]["owa_ddl"]
cur = get_Redshift_connection()
lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
lines = iter(lines)
next(lines)
sql = "BEGIN; DELETE FROM {schema}.{table}; {ddl};".format(schema=schema, table=table, ddl=ddl)
for line in lines:
if line != "":
(date, temp, min_temp, max_temp) = line.split(",")
logging.info(f"{date} - {temp} - {min_temp} - {max_temp}")
sql += f"""INSERT INTO {schema}.{table} VALUES ('{date}', '{temp}', '{min_temp}' , '{max_temp}', default);"""
sql += "END;"
logging.info(sql)
cur.execute(sql)
dag_open_weather_api = DAG(
dag_id='dag_open_weather_api',
start_date=datetime(2021, 12, 1), # 날짜가 미래인 경우 실행이 안됨
schedule_interval='1 * * * *', # 적당히 조절
max_active_runs=1,
catchup=False,
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
extract = PythonOperator(
task_id='extract',
python_callable=extract,
params={
'base_url': Variable.get("base_url"),
'lat': Variable.get("lat"),
'lon': Variable.get("lon"),
'part': Variable.get("part"),
'api_key': Variable.get("api_key")
},
provide_context=True,
dag=dag_open_weather_api)
transform = PythonOperator(
task_id='transform',
python_callable=transform,
params={
},
provide_context=True,
dag=dag_open_weather_api)
load = PythonOperator(
task_id='load',
python_callable=load,
params={
'owa_schema': Variable.get("owa_schema"),
'owa_table': Variable.get("owa_table"),
'owa_ddl' : Variable.get("owa_ddl")
},
provide_context=True,
dag=dag_open_weather_api)
extract >> transform >> load
"""
<Connections>
Conn Id : redshift_dev_db
Conn Type : Amazon Redshift
Host : learnde ~~ redshift.amazonaws.com
Port : 5439
<Variables>
(key, val)
api_key, ********(open_weather_api key)
base_url, https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&appid={api_key}&units=metric
lat, 37.5683
lon, 126.9778
owa_ddl, CREATE TABLE IF NOT EXISTS seonmin1219.weather_forecast ( date date primary key, temp float, min_temp float, max_temp float, created_at timestamp default sysdate );
owa_schema, seonmin1219
owa_table, weather_forecast
part, current,minutely,hourly,alerts
"""
airflow api를 호출해서 환경 설정 파일을 변경할 수 있음
다음 주
가상의 production mysql db를 생성하고, redshift dw에서 이 mysql db로 데이터를 옮기는 작업을 할 예정.
Q. sql 문자열이 너무 길어지는 부분에서는 리스크가 없나요?
A. 네 있습니다. 너무 길어지지 않게 주의해주세요
Q. 선생님
일전 질문에서 nifi 는 etl 개발에 초점이 맞춰져 있고 airflow 는 관리에 초점이 맞춰져 있다고 하셨었는데
혹시 airflow 에서도 nifi 와 같이 etl 을 개발할 수 있을까요?
A. airflow는 etl개발, etl관리 둘 다 가능하다.