SELECT LEFT(created_at, 10) AS date,
ROUND(SUM(CASE WHEN score >= 9 THEN 1
WHEN score <= 6 THEN -1 END)::float*100/COUNT(1), 2)
FROM keeyong.nps
GROUP BY 1
ORDER BY 1;
위에서 만든 SQL을 맨 처음 시작하는 SELECT문에 넣어서 테이블 생성
input check : ctas를 실행하기 전에 미리 체크하는 부분. 하나라도 만족하지 않으면 에러 발생시킴
보통 count보다 결과가 클 경우에만 정상이라고 판단하고 넘어간다
count보다 작을 경우 에러 발생
{
'table': 'nps_summary',
'schema': 'keeyong',
'main_sql': """SSELECT LEFT(created_at, 10) AS date,
ROUND(SUM(CASE
WHEN score >= 9 THEN 1
WHEN score <= 6 THEN -1 END)::float*100/COUNT(1), 2)
FROM keeyong.nps
GROUP BY 1
ORDER BY 1;""",
'input_check':
[
{
'sql': 'SELECT COUNT(1) FROM keeyong.nps',
'count': 150000
},
],
'output_check':
[
{
'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}',
'count': 12
}
],
}
output check : 임시테이블 생성 후 체크해서 정상적일 경우, 임시테이블과 운영테이블을 바꿔치기
SSLECT LEFT
~ 구문이 들어감Build_Summary_v2.py
from airflow import DAG
from airflow.macros import *
import os
from glob import glob
import logging
import subprocess
from plugins import redshift_summary
from plugins import slack
DAG_ID = "Build_Summary_v2"
dag = DAG(
DAG_ID,
schedule_interval="25 13 * * *",
max_active_runs=1,
concurrency=1,
catchup=False,
start_date=datetime(2021, 9, 17),
default_args= {
'on_failure_callback': slack.on_failure_callback,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
)
# this should be listed in dependency order (all in analytics)
tables_load = [
'nps_summary'
]
dag_root_path = os.path.dirname(os.path.abspath(__file__))
redshift_summary.build_summary_table(dag_root_path, dag, tables_load, "redshift_dev_db")
build_summary_table 함수
on_failure_callback 옵션
fernet_key
라는 변수가 encyption key임AWS의 보안 ???
특정 리소스에 대한 접근을 가능하게 해주는....
동일한 AWS계정으로 서버를 생성하고 REDSHIFT를 생성해도 서로 커넥션을 맺어줘야함. 당연하지
Build_Summary_v2 : dag 이름
anayltics__nps_summary : task 이름
2021-12-24T13:25:00+00:00 : execution date
DataAlert : App 이름
직접 따라해보고 싶으면 슬랙을 하나 테스트용으로 만들고, admin권한이 있다면 실습 따라해볼 수 있음
연동하기 Sending messages using Incoming Webhooks
Webhook의 역할? : slack에 api endpoint가 생기고, 그 end point로 메시지를 보내면 webhook과 연동된 채널로 내가 보낸 메시지가 전달된다.
Webhook으로 메시지 보내기
curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}'
https://hooks.slack.com/services/T016X1V5HBQ/B02QB4GGNQM/xone4l4N3gMLTQRn
RBWYaZ9y
airflow 파이프라인 실패/경고를 슬랙으로 보내는 방법
default_args= {
'on_failure_callback': slack.on_failure_callback,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
AIRBNB에서 airflow 만든 사람이 또 만든..;;; 시각화 BI툴
AIRBNB내부에서는 여전히 메인 대시보드로 사용하는 중
사용자 5명까지 공짜로 사용할 수 있음
다양한 형태의 visualization와 손쉬운 인터페이스 지원
• 대시보드 공유 지원
• 엔터프라이즈 수준의 보안과 권한 제어 기능 제공
• SQLAlchemy와 연동
– 다양한 데이터베이스 지원
• Druid.io와 연동
기타 BI/Dashborad툴
Superset Architecture & Terminology