[7주차] ending

zuckerfrei·2024년 2월 12일
0

6주차 숙제 리뷰

숙제3-1. 일별 NPS를 SQL로 계산

SELECT LEFT(created_at, 10) AS date,
  ROUND(SUM(CASE WHEN score >= 9 THEN 1 
              	 WHEN score <= 6 THEN -1 END)::float*100/COUNT(1), 2)
FROM keeyong.nps
GROUP BY 1
ORDER BY 1;

숙제3-2. NPS SQL을 주기적 요약테이블로 만들기

위에서 만든 SQL을 맨 처음 시작하는 SELECT문에 넣어서 테이블 생성

  • config/nps_summary.py 라는 파일을 생성했고, 이 파일은 그냥 dict임
    • → ctas로 테이블을 만들고 데이터가 1건이라도 들어갔는지 체크하는 파일.
    • 기존 nps_summary.py의 기능과 동일하지만 별도의 파일로 뺀 것임
    • 코딩 하지 않는 데이터 분석가들이 주로 사용하는 방법. config 폴더에서 python dict형식으로 간단하게 코딩해도 되니까
  • inputcheck, output check도 있음
    • input check : ctas를 실행하기 전에 미리 체크하는 부분. 하나라도 만족하지 않으면 에러 발생시킴

    • 보통 count보다 결과가 클 경우에만 정상이라고 판단하고 넘어간다

    • count보다 작을 경우 에러 발생

      {
                'table': 'nps_summary',
                'schema': 'keeyong',
                'main_sql': """SSELECT LEFT(created_at, 10) AS date,
        ROUND(SUM(CASE
          WHEN score >= 9 THEN 1 
          WHEN score <= 6 THEN -1 END)::float*100/COUNT(1), 2)
      FROM keeyong.nps
      GROUP BY 1
      ORDER BY 1;""",
                'input_check':
                [
                  {
                    'sql': 'SELECT COUNT(1) FROM keeyong.nps',
                    'count': 150000
                  },
                ],
                'output_check':
                [
                  {
                    'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}',
                    'count': 12
                  }
                ],
      }
    • output check : 임시테이블 생성 후 체크해서 정상적일 경우, 임시테이블과 운영테이블을 바꿔치기

  • 일부러 에러발생시키기 위해 SSLECT LEFT ~ 구문이 들어감

Build_Summary_v2.py

from airflow import DAG
from airflow.macros import *

import os
from glob import glob
import logging
import subprocess

from plugins import redshift_summary
from plugins import slack

DAG_ID = "Build_Summary_v2"
dag = DAG(
    DAG_ID,
    schedule_interval="25 13 * * *",
    max_active_runs=1,
    concurrency=1,
    catchup=False,
    start_date=datetime(2021, 9, 17),
    default_args= {
        'on_failure_callback': slack.on_failure_callback,
        'retries': 1,
        'retry_delay': timedelta(minutes=1),
    }
)

# this should be listed in dependency order (all in analytics)
tables_load = [
    'nps_summary'
]

dag_root_path = os.path.dirname(os.path.abspath(__file__))
redshift_summary.build_summary_table(dag_root_path, dag, tables_load, "redshift_dev_db")
  • build_summary_table 함수

    • redshift_summary가 지원하는 함수로, 서머리 테이블을 생성시켜주는 함수이다.
    • 내부적으로는 temp 테이블을 사용한다(지금까지 우리가 배운 것 처럼)
    • dag_root_path : 이 경로 밑에 있는 config폴더의 파일들을 쭉 읽어옴
    • dag : dag object생성
    • tables_load : 서머리 테이블 이름이 들어간 리스트. 이 인자의 리스트 길이만큼 task가 생성되고 순차적으로 실행됨
    • redshift_dev_db : redshift connection object 이름
  • on_failure_callback 옵션

    • dag생성시 여기에 어느 함수이름을 파라미터로 주면, dag실행 중 에러가 발생할 경우 그 정보를 파라미터로 적어놓은 함수에게 넘겨줌
    • slack, mail, sms 등등 알림 가능함
    • 이 예시에서는 plugins라는 패키지에 slack.py를 작성했고, 이 py파일에 있는 on_failure_callback라는 함수를 파라미터로 적은 것임

Airflow Configuration for Production Usage

  • airflow 1대로 충분하면 그냥 그대로 써라
  • 만약 다수의 airflow를 사용하게 될 경우
    • 그냥 클라우드업체에서 제공하는 aws mwaa, gcp cloud composer같은 것을 사용 하는 것이 편하다
    • 멀티노드도 지원함
    • aws mwaa는 노드맞게 워커 수를 동적으로 늘리고 줄이고 해줘서 굳이 내가 서버를 얼마나 쓸지 고민하지 않아도 됨. but aws의 connection manager를 사용해야만해서 기존 airflow 코드를 그대로 사용할 수 없어서 귀찮은 단점(기존에는 PostgresHook 같은거 썼는데 mwaa는 이게 아니라고 함)

airflow 운영시 주의사항

  • 설정파일은 airflow.cfg이고, 파일 수정시 webserver, scheduler를 재시작해야 반영됨
  • 최초 설치시 sqlite가 설치되는데 이를 postgres, mysql같은 다른 db로 변경해야함
    • 반드시 같은 서버의 db를 쓰지않아도 되고, aws rds같은 서비스를 사용하여 외부 db로 변경해도 가능함(rds의 자동백업 기능도 쓸 수 있다는 장점)
  • airflow databse를 백업하자
    • 어느 dag를 생성하고, 그 dag는 airflow database를 매일 백업해놓도록 하는 것도 방법
    • 자동으로 암호화하여 저장하기 때문에 encyption key가 있어야함 - airflow.cfg 파일의 fernet_key라는 변수가 encyption key임
    • 따라서 암호키가 있어야 백업해놓은 데이터를 볼 수 있기 때문에 airflow.cfg파일도 잘 백업해야한다.
  • LocalExecutor 사용
    • 서버 1대에서 운영할 경우에는 LocalExecutor사용하기
    • 멀티워커일 경우 localexecutor 못씀 → 그때는 celelyexecutor같은 것을 써야함
    • executor가 코딩에 영향을 미치지는 않음 but 운영환경에 맞게 변경해줘야함
  • airflow 2.0 버전부터 web UI에 login 기능이 추가되어 보안 강화
    • 1.0버전에서는 없었어서 아무나 8080포트로 들어오기도 했음
  • 계속 다양한 로그파일이 생성되는데 서버 디스크용량이 충분하지 않으면 금방 꽉차게됨
    • 마찬가지로 airflow.cfg 파일의 base_log_folder, child_process_log_directory 설정
    • 주기적으로 로그 삭제 or 로그파일 이동
    • dag + shell Operator를 활용할 수 있음
  • 서버 1대 이상을 사용할 경우
    • 계속 1대를 사용하되 더 고성능 서버를 사용하는 것을 추천
    • 그래도 안 된다 싶으면 스케일아웃해서 워커노드를 더 붙인다
    • 이럴 경우 여유가 있으면 클라우드로 갈아타라 → 운영이 훨씬 쉬워짐

AWS/Airflow 보안 관련

IAM

AWS의 보안 ???
특정 리소스에 대한 접근을 가능하게 해주는....
동일한 AWS계정으로 서버를 생성하고 REDSHIFT를 생성해도 서로 커넥션을 맺어줘야함. 당연하지

Slack 연동하기

  • airflow에서 task가 fail 할 때 마다 fail 메시지를 전달해주는 채널을 만드는 것

Build_Summary_v2 : dag 이름
anayltics__nps_summary : task 이름
2021-12-24T13:25:00+00:00 : execution date
DataAlert : App 이름

  • 직접 따라해보고 싶으면 슬랙을 하나 테스트용으로 만들고, admin권한이 있다면 실습 따라해볼 수 있음

  • 연동하기 Sending messages using Incoming Webhooks

    • 1) 이 링크 따라해서 incomming Webhooks App 생성하기
    • 2) 생성한 Webhook과 특정 채널을 연결시킴
  • Webhook의 역할? : slack에 api endpoint가 생기고, 그 end point로 메시지를 보내면 webhook과 연동된 채널로 내가 보낸 메시지가 전달된다.

  • Webhook으로 메시지 보내기

    • 예시 전문
      curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}' 
      https://hooks.slack.com/services/T016X1V5HBQ/B02QB4GGNQM/xone4l4N3gMLTQRn
      RBWYaZ9y
    • https://hooks.slack.com/~~~ : end point
    • 이 end point로 Hello, world 라는 메시지를 전달
  • airflow 파이프라인 실패/경고를 슬랙으로 보내는 방법

    • end point를 “slack_url”이라는 Variable로 저장(T016X1V5HBQ/B02QB4GGNQM/xone4l4N3gMLTQRn
      RBWYaZ9y) 반드시 slack_url일 필요는 없구 그냥 예시로
    • slack에 에러 메시지를 보내는 별도 모듈 작성 : slack.py
    • 이를 DAG 인스턴스를 만들 때 에러 콜백으로 지정
      default_args= {
              'on_failure_callback': slack.on_failure_callback,
              'retries': 1,
              'retry_delay': timedelta(minutes=1),
          }
  • slack뿐만 아니라 다른 서비스를 airlfow와 연동 시킬 수 있으니 많이 생각해보면 좋다.
    • 구글 스프레드시트와 redshift연동하면 매우 편리하다
    • 구글 스프레드시트 내용을 redshift의 테이블로 읽거나 반대로 할 수 있게됨

Superset

소개

  • AIRBNB에서 airflow 만든 사람이 또 만든..;;; 시각화 BI툴

  • https://preset.io/

  • AIRBNB내부에서는 여전히 메인 대시보드로 사용하는 중

  • 사용자 5명까지 공짜로 사용할 수 있음

  • 다양한 형태의 visualization와 손쉬운 인터페이스 지원
    • 대시보드 공유 지원
    • 엔터프라이즈 수준의 보안과 권한 제어 기능 제공
    • SQLAlchemy와 연동
    – 다양한 데이터베이스 지원
    • Druid.io와 연동

  • 기타 BI/Dashborad툴

    • 돈 걱정 없으면 Looker쓰는 것이 제일 좋고. 약간의 러닝커브 있지만 한 번 배우면 수정하기 좋게 잘 만듦. 임베딩 지원도 좋아서 외부에 대시보드를 판매하기도 좋음. 실리콘밸리에서는 요즘 거의 Looker를 사용하더라. 태블로 쓰는 곳 최근 2~3년간 못봤다고 함
    • tableau는 한때 1위였고, 대시보드 수정할 때 고통스러워서 단점.
  • Superset Architecture & Terminology

    • Based on Flask and React JS
    • Uses a metadata database : 기본은 Sqlite, 변경해야함
    • SqlAlchemy로 여러 DB지원
    • Database/Dataset
      • Database : 관계형데이터베이스를 의미함(== Redshift)
      • Dataset : 테이블을 의미함 (== Summary Table)
    • Chart : 그래프 1개
    • Dashboard : 차트의 집합. 그래프가 하나 이상 여러 개 모인 것

설치

  • 어느 정도 성능이 좋은 서버에 docker로 설치하는 것을 추천

구성 및 차트 작성

  • 강의 자료 확인해서 실습해보기

Next Steps

  • 이번 강좌에서 배운 것은 데이터 웨어하우스를 기반으로 데이터 인프라를 만드는 것
  • Data Analyst와 어떻게 협업해야하는지에 더 집중 (Summary Table & Dashboard)
  • 다음 스텝은 Data Scientist와 어떻게 협업을 해야하는지에 대해 배워보는 것
    • Feature 계산
      • 이는 큰 스케일에서는 Feature 계산을 위한 Spark과 같은 대용량 계산 프레임웍에 대한 지식을 필요
      • 작은 스케일에서는 Python의 Pandas와 같은 라이브러리 사용이 필요
    • Model Serving
      • Data Scientist가 만든 모델을 어떻게 프로덕션으로 론치할 수 있는지?
    • A/B Test
  • MLOps와 ML Engineer
profile
무설탕 음료를 좋아합니다

0개의 댓글