데이터 엔지니어링 스터디 4주차

skh951225·2023년 4월 16일
0

들어가기 앞서

<질문 1> 멱등성이 무엇인가요?

  • 데이터 파이프라인이 연속 실행되었을 때 소스에 있는 데이터가 그대로 DW로 저장되어 있어야함
  • No duplicates, no missing data
  • 멱등성을 유지하면서 데이터를 업데이트 하는 방법은 크게 두 가지가 있다.
    • Incremental update
    • Full refresh
  • Full refresh를 할 때 발생할 수 있는 문제점
    • DW에 있는 데이터를 모두 날렸을때 다른 사람이 DW를 읽으면 비어있는 것 처럼 보임
    • Data를 DW에 복사하는 과정에 에러가 나면 데이터의 정합성이 깨짐
    • → transaction으로 해결

<질문 2> Full refresh, Incremental update 어떤 것을 사용해야하나요?

  • 초기에는 데이터의 규모가 작아서 Full refresh 방법을 보통 쓴다. 간단하니까
  • 사람들이 그 데이터를 언제 쓰느냐에 따라 다르겠지만 보통 Hourly job의 경우 update에 소요되는 시간이 30분이상, Daily job의 경우 update에 소요되는 시간이 6시간 이상걸리면 Incremental update를 쓰는 것을 추천

<질문 2> 간혹 통신 딜레이 등으로 시간이 조금 늦게 들어오는 데이터에 대해 약간의 마진을 주어 작업하는 것이 안전할까요?

  • 네. daily job의 경우 schedule="10 2 * * *" 와 같이 설정하는 방법을 고려해볼 수 있습니다. 마진을 얼마나 줄지는 상황에 맞게 설정하시면 됩니다.

Transaction

  • Atomic하게 실행되어야 하는 SQL 쿼리들을 묶어서 하나의 작업처럼 처리하는 방법
  • BEGIN; {원하는 쿼리들} COMMIT;/END; 와 같은 형태로 씀
  • DB에 부담을 주지 않기 위해 필요한 만큼만 최소한의 SQL을 포함시키는 것이 좋음
  • 아직 Commit을하지 않으면 DB에는 반영하지 않지만 나한테는 변경사항이 보임
  • 만약 아직 Commit을 하지않은 쿼리에 대해서 취소하고 싶다면 ROLLBACK;을 하면됨
  • autocommit=True/False
    • True로 설정할 경우 SQL query가 DB에 바로 반영됨. 물론 이경우에도 BEGIN;, COMMIT;/END;를 사용해 Transaction으로 묶어서 사용할 수 있다.
    • False로 설정할 경우 SQL query가 DB에 바로 반영되지 않고 COMMIT;을 해줘야 반영됨
  • Python의 경우 try/except를 사용하는 것이 일반적
    • try ~ commit / except rollback raise
    • error가 발생하였더라도 commit을 해주면 자동으로 rollback되지만 명시적으로 rollback을 실행해주는 것이 좋음
    • ETL을 관리하는 입장에서 except에 raise로 어떤 error가 발생하였는지 나타내는 것이 좋다. 예상치 못한 문제를 예방하기 위해

Airflow

Airflow 설치 방법

  1. docker를 기반으로 설치
  2. 직접 설치하는 방법
  • Ec2 T3.small (2 CPU, 2GB memory, SSD 8GB), Ubuntu 20.04를 이용해서 airflow 모듈, db를 직접 설치하고 구성
  • 어느정도 규모가 되면 t3.medium, t3.large를 써야함

Airflow 서버에 접속해서 명령어로 다뤄보기

$ airflow dags list # dag list를 보여줌
$ airflow tasks list {dag_id} # dag에 포함된 task list를 보여줌
$ airflow dags test {dag_id} {execution_date} # dag를 실행
$ airflow tasks test {dag_id} {task_id} {execution_date} # task를 실행
$ airflow dags backfill {dag_id} -s {start_date} -e {end_date} # backfill

Parameter

  • dag에 적용되는 특별한 parameter
    • schedule='@once' : 주기적으로 실행되는 것이 아니라 어떤 trigger로 인해 실행되는 것
    • max_active_runs: 동시에 도는 dag의 최대치 -> full refresh의 경우 1로 설정
    • max_active_tasks: 동시에 도는 task의 최대치
    • max_active_runs, max_active_tasks는 cpu의 수가 자동으로 upper bound
    • default_args에 지정하는 값은 task level로 적용됨
  • parameter로 쓸 값을 variable, connection으로 등록하기
    • 코드에 노출되어서는 안되는 중요한 정보를 숨길 수 있다.
    • Web UI - Admin - Variables/Connections
  • Xcom을 통해 task의 Return 값을 다른 task로 전달할 수 있다.
    • Xcom으로 전달할 수 있는 데이터의 최대 크기는 48kb 이를 초과할시 S3를 이용

DW에서 Primary Key Uniqueness를 보장하는 방법

데이터 웨어하우스에서 Primary Key Uniqueness를 보장하기 위한 방법에 대
1. CTAS로 임시테이블 생성
2. 임시 테이블(stagin table)로 레코드들을 추가
3. 기존 테이블 DELETE, 중복을 없앤 형태로 새로운 테이블 생성

  • 보통 timestamp를 기준으로 가장 최근 데이터를 사용
INSERT INTO skh951225.weather_forecast
SELECT date, temp, min_temp, max_temp, created_date
FROM (
 SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
 FROM skh951225.temp_weather_forecast
)
WHERE seq = 1;

0개의 댓글