[TIL 44일자] 데브코스 데이터엔지니어링

·2023년 6월 10일
0

데브코스

목록 보기
40/55
post-thumbnail

📚 오늘 공부한 내용

1. airflow와 Timezone

  • airflow.cfg에는 두 종류의 Timezone 관련 키가 존재하는데 default_timezone, default_ui_timezone이 있다.
  • start_date (시작일), end_date(종료일), schedule - default_timezone에 지정된 Timezone을 따른다.
  • execution_date로그 시간
    • 항상 UTC를 따르게 되어 있음
    • 그렇기 때문에 UTC를 일관되게 사용하는 것이 변환이 필요 없어서 선호됨

2. DAGs 폴더에서 코딩 작성 시 주의할 점

  • Airflow는 dags 폴더를 dag_dir_list_interval의 설정 타임에 따라 주기적으로 스캔되고, default로는 5 분에 한 번씩 스캔된다.
  • 이때 DAG 모듈이 들어 있는 모든 파일들의 메인 함수가 실행되기 때문에 이 경우 본의 아니게 개발 중인 테스트 코드도 실행될 수 있다. -> 주의해야 할 것

3. Primary Key Uniqueness

1) Primary Key Uniqueness란?

  • 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드를 Primary Key이다.
  • 관계형 데이터베이스 시스템은 Primary Key의 값 중복 존재를 막아 주는데 데이터 웨어하우스에서는 Primary Key의 유일성을 보장해 주지 않는다.
  • 만약 Primary Key가 다수가 된다면 Foreign Key가 될 수도 있다. 사실 Foreign Key는 Create 시에는 적어 주지 않아도 되는데 데이터의 정합성을 체크해 주기도 하고 다른 테이블에 해당 Key가 존재한다는 걸 알 수 있게 해 주기 때문에 쓰기도 한다.

2) 왜 빅데이터 기반 데이터 웨어하우스들은 Primary Key를 보장해 주지 않을까?

  • 테이블의 많은 레코드들의 Primary Key를 모두 리스트에 올려 놓고 Look up을 해 주어야 한다.
  • 그렇기 때문에 시간이 많이 들고, 메모리의 리소스도 많이 차지하게 된다.
  • 오히려 유일성을 보장하게 되면 대용량 데이터의 적재의 걸림돌이 된다.
  • 그렇기 때문에 Primary Key를 보장해 주어야 하는 것은 ETL을 할 때는 데이터 엔지니어가, ELT를 할 때는 데이터 분석가가 작업해 주어야 한다.

3) 그렇다면 어떻게 Primary Key Uniqueness를 보장해 줄 수 있을까?

CREATE TABLE keeyong.weather_forecast (
 date date primary key,
 temp float,
 min_temp float,
 max_temp float,
 created_date timestamp default GETDATE()
);
  • 해당 테이블을 예시로 설명을 한다면 Full Refresh가 아닌 Incremental Update로 구현을 하게 된다면 Primary key Uniqueness를 보장할 수 있다.

  • 이때 created_date 필드가 필요해진다.

  • default GETDATE()를 통해 적재된 날짜가 저장되게 되는데 해당 테이블은 날씨 정보이기 때문에 최근 정보가 더 신뢰할 수 있는 정보이고 신뢰도에 따라 중복 데이터가 발생하였을 때 더 중요한 데이터를 적재해 줄 수 있다.

  • 즉, created_date가 더 최근인 정보를 적재하는 것이다.

  • 그렇다면 created_date가 더 최근인 정보임을 어떻게 비교하고 적재할 수 있을까?

    • ROW_NUMBER()를 사용해 준다.
    • CREATED_DATE가 큰 것(더 최근의 것)부터 정렬될 수 있도록 ORDER BY를 해 준다.
    • 그룹핑의 기준을 DATE로 해서 중복이 발생하지 않도록 해 준다.
ROW_NUMBER() OVER(PARTITION BY DATE ORDER BY CREATED_DATE DESC) SEQ
  • 정리해 보면 임시 테이블을 만들고 현재 모든 레코드를 임시 테이블로 복사해 준다.
  • 임시 테이블에 새로 데이터 소스에서 읽어 들인 레코드들을 복사하는데 이 과정에서는 중복이 존재할 수 있다.
  • 중복을 제거해 줄 수 있는 SQL 작성한다.
    • 최신 레코드를 우선 순위로 정렬 및 선택
    • ROW_NUMBER를 이용해서 PRIMARY_KEY를 PARTITION 기준으로 잡고, 데이터의 적재 타임 필드를 ORDER BY 기준으로 잡는다.
  • 해당 SQL 결과를 최종 테이블로 복사한다.

4. Upsert란?

  • Primary Key를 기준으로 존재하는 레코드라면 새 정보로 수정한다.
  • 존재하지 않는 레코드라면 새 레코드로 적재한다.
  • 보통 데이터 웨어하우스마다 UPSERT를 효율적으로 해 주는 문법을 지원해 준다.

5. Incremental Update가 실패하면?

  • 만약 5 월 24 일, 5 월 25 일에 실패했다면 5 월 24 일은 23 일, 25 일은 24 일 정보를 읽어오도록 되어 있기 때문에 빠지는 데이터가 생기게 된다.
  • 이렇기 때문에 backfill이 필요하게 되는데 airflowbackfill이 쉽게 된다.
  • backfillIncremental Update를 할 때만 사용할 수 있다.
  • Incremental Update는 효율성이 좋지만 유지보수와 운영의 난이도가 올라간다.

6. Backfill

  • Backfill은 실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터들의 문제로 다시 다 읽어와야 하는 경우를 의미한다.
  • Full Refresh는 그냥 다시 실행해 주면 되지만 Incremental Update는 복잡해지기 때문에 해당 내용을 구현할 때 Backfill을 더 쉽게 할 수 있는 법까지 생각해 구현해야 한다.

✍ 어떻게 ETL을 구현해 놓으면 이런 일이 편해질까?

  • 날짜별로 backfill 결과를 기록하고 성공 여부를 기록한다.
  • 날짜를 시스템에서 ETL 인자로 제공하여 데이터 엔지니어는 읽어와야 되는 데이터의 날짜를 계산하지 않고 시스템이 지정해 준 날짜를 사용하게 한다.
  • airflow는 모든 날짜에 대해서 실행이 실패했는지 성공했는지 그리고 그에 따른 날짜를 기록해 둔다. 모든 DAG 실행 시에는 execution_date이 지정되어 있는데 airflow에서 execution_date를 읽어온 후 이 날짜를 바탕으로 데이터 갱신이 되도록 코드를 작성해 준다.
  • 이를 통해 backfill이 쉬워진다.

7. Daily Incremental Update를 구현해야 한다면?

  • 언제 데이터부터 매일 하루치 데이터를 읽어올 건지 결정하고, ETL 동작이 시작하는 날짜를 start_date 값으로 설정해 준다.
  • 즉, start_date는 처음 읽어와야 하는 날짜라고 생각하면 된다. data의 start_date가 된다. 만약 @daily DAG라면 2020-11-08 데이터를 읽어와야 한다면 2020-11-07로 설정해 주어야 다음 날 DAG가 처음 실행된다. (주기에 따라 지정해 준 날짜를 기점으로 다음에 실행되기 때문)
  • 이때 `execution_date1는 시스템 변수로 읽어와야 하는 데이터 날짜를 지정해 준다.

✍ start_date와 execution_date를 좀 더 이해해 보자.

  • 2020-08-10 02:00:00로 start_date가 설정된 Daily job이 있다. 이때 catchup은 True.
  • 지금 시간은 2020-08-13 20:00:00이고 처음 이 job이 활성화되었다.
  • 이때 이 job은 몇 번 실행될까? (execution_date)

<내 생각>
catchup이 True라는 것은 실행되지 않은 이전 시간에 대한 작업이 실행된다는 것이고, 시작일자를 2020-08-10 02:00:00로 두었다면 그 다음 일자인 2020-08-11 02:00:00부터 작업이 실행될 것이다. 현재 시간은 2020-08-13 20:00:00이기 때문에 13 일의 작업까지 실행되어야 맞다.
그렇다면,

  • 2020-08-11 02:00:00
  • 2020-08-12 02:00:00
  • 2020-08-13 02:00:00
    세 번 실행된다.

<풀이>

  • 풀이 들었는데 생각한 것과 동일했음. 2020-08-10 02:00:00는 실행되지 않고 2020-08-13 20:00:00은 02:00:00에 실행되므로 총 세 번 실행된다.

8. Backfill과 관련된 변수

1) start_date

  • DAG가 처음 실행되는 날짜가 아니라 DAG가 처음 읽어와야 하는 데이터의 날짜와 시간. 실제 첫 실행일자는 start_date + DAG의 실행 주기.

2) execution_date

  • DAG가 읽어와야 하는 데이터의 날짜와 시간

3) catchup

  • DAG가 처음 활성화된 시점이 start_date보다 미래라면 그 사이에 실행 안 된 것들을 처리할 건지 처리하지 않을 건지 결정해 주는 파라미터로 default가 True. True인 경우 이전 작업을 실행해 주고, False인 경우 실행 안 된 것들은 무시한다.

4) end_date

  • 이 값은 보통은 설정해 주지 않지만 backfill을 날짜 범위에 대해서 하는 경우 사용한다.


🔎 어려웠던 내용 & 새로 알게 된 내용

📌 과제 - airflow.cfg (풀이)

1. DAGs 폴더는 어디에 지정되는가?

  • Airflow가 설치된 디렉토리 밑의 dags가 폴더가 되며 dags_folder 키에 저장된다.
    2. DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가?
  • dag_dir_list_interval이 키가 되고, 기본 값은 300 (초) 즉, 5 분이다.
    3. 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야 하는가?
  • api 섹션의 auth_backendairflow.api.auth.backend.basic_auth로 변경해야 한다. airflow.api.auth.backend.basic_auth란 아이디와 패스워드로 인증하는 방법이다.
    4. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까?
  • 이 부분은 이해를 잘못했었는데 진짜로 암호화를 하는 게 아니라 variable에 값을 *로 표시되게 만들어 주는 방법을 찾는 게 해당 문제였다.
    password, secret, passwd, autorization, api_key, apikey, access_token과 같은 단어를 써 주어야 한다.

    5. 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?
  • sudo systemctl restart airflow-webserver 혹은 sudo systemctl restart airflow-scheduler 같은 명령으로 재실행을 해 준다.
    6. Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?
  • fernet key를 사용해 준다.

📌 과제 - UpdateSymbol_v2의 Incremental Update 방식을 수정하기

  • ROW_NUMBER 방식을 사용해서 Primary Key가 동일한 레코드들을 처리하도록 수정


✍ 회고

- airflow 과정을 진행하면서 궁금한 부분들이 많았고, 해당 부분에 대해서 강사님께 물어보며 해답을 찾은 부분이 많았다. 그래서 새로 알게 된 부분들은 따로 포스팅을 해 두면 좋지 않을까 싶어 45 일자까지 강의를 다 듣고 정리가 끝나면 정리해 볼 예정이다.

profile
송의 개발 LOG

0개의 댓글