데이터 엔지니어링 스터디 5주차

skh951225·2023년 4월 22일
0

들어가기 앞서

질문

<질문 1>
데이터 처리를 airflow worker에서 돌리는 방법, airflow가 특정 목적을 가진 cluster에 job을 전달하는 방식 어떤 방식이 보통 쓰이나요?

  • 후자가 일반적

<질문 2>
오늘의 집 데이터팀에서는 쿠버네티스를 사용하던데 어떻게 하는 건가요?

  • kubernetes는 컴퓨팅 자원을 중앙에 모아놓고 각 팀이 필요한 만큼 자원을 끌어쓰고 다 사용하면 return 하는 방식이기 때문에 컴퓨팅 자원의 이용률을 높일 수 있는 장점이 있습니다. airflow에서는 KuberentesExecutor를 쓰는 방법, KubernetesPodOperator 쓰는 방법이 있는데 자세한 내용은 다음주에...

<질문 3>
Incremental update에서 CDC를 구현할때 최근에는 Timestamp나 version 기록 말고, 트랜잭션 로그를 통해 구현하는 경우도 있다고 알고 있습니다.

  • CDC(Change data capture)는 데이터의 변경 내역을 추적하고, 해당 변경 사항만을 캡처하여 이후에 처리하는 기술을 말한다.
  • Timestamp, version
    • data에 modified_at, deleted column을 추가하여 더 최근에 변경된 내용을 반영할 수 있다.
  • Transaction log
    • 보통 Database의 low level에서는 Transaction log가 생긴다. 테이블에 변화가 생기면 그 변화를 Transaction log 파일에 저장을 한다.
    • Transaction log를 streaming 형태로 데이터 웨어하우스에 보내서 반영하게 할 수 있다.
    • 클라우드에서 사용하려면 비용이 조금 비싸다. 회사가 그 비용을 감당할 수 있느냐? 그 비용을 감당할 수 있을만큼 가치가 있느냐에 따라 다르다.
    • 만약 테이블 전체에 대한 복사작업을 수행해야한다면 이 방법은 오버헤드가 크다. 이 방법이 꼭 깔끔하게 동작하지는 않는다.

<질문 4>
airflow만을 사용해서 ETL, test를 구현할 수 있는데 굳이 dbt를 사용하는 이유는 무엇일까요?

  • CTAS 써서 새로운 테이블을 만들고 configuration 형태로 앞뒤에 테스트를 구현할 수 있다.
  • 하지만 dbt는 이러한 작업을 쉽게 해준다. 역사가 오래된 오픈소스이고 이러한 작업을 많이 한 사람들이 만든 것이기때문에 철학 자체가 굉장히 잘 만들어져 있고 테스트, 문서화, 테이블간의 리니지(혈통)을 ERD 형태로 쉽게 만들어준다.
    • lineage : 특정 테이블을 생성하기 위해 어떤 테이블들을 사용했는지, 특정 테이블을 활용하여 만든 테이블이 어떤 것이 있는지

<질문 5>
SQL query 퍼포먼스를 미리 측정할 수 있는 방법이 어떤것이 있나요?
-Explain SELECT .. 실행해보면 query plan을 보여줌

  • query plan과 테이블의 크기를 고려하면 얼추 예측할 수 있다. 하지만 Join, groupby하려는 값의 분포에 의해서 많이 달라질 수 있다.

<질문 5>
Dag에 task를 어떻게 구성하는것이 좋나?

  • 전체 작업을 하나의 테스크에 몰아 넣는 방법, 테스크를 세분화 하는 방법 각자의 장단점이 있다.
  • 테스크를 너무 잘게 나누면
    • scheduler가 그만큼 schedule을 많이해야해서 그에 대한 오버헤드가 발생
    • 하지만 해당 dag가 중간에 실패하면 일부만 테스크만 재실행해도 되서 시간이 덜 걸린다.
  • 하나의 테스크에 모든 job을 몰면
    • scheduling 에대한 부담은 없다.
    • 하지만 해당 dag가 중간에 실패하면 처음부터 시작해야한다. 재실행시 시간이 오래걸린다.

숙제 리뷰

  • dag의 실행주기를 나타내는 파라미터 schedule_interval, schedule를 모두 사용가능하다. schedule_interval은 곧 사라질 예정이기 때문에 schedule을 쓰는 것이 권장됨
  • Variable도 신중히 사용되어야함. 만약 sql 문까지 전부 variable로 저장하면 sql이 어떻게, 왜 변경되었는지 보이지 않기 때문에 서비스에 문제가 생길 수 있다. 중요한 로직과 관련된 것은 코드의 일부로 들어가야 코드리뷰, 디버깅에 있어 수월하다.
  • CREATE TABLE {table1} LIKE {table2}
    • table2의 schema를 가지는 table1으로 가지고 올 수 있음
    • additional properties를 가져오려면 INCLUDING 절을 사용해야함
    • 기본적으로는 additional properties는 가져오지 않는 EXCLUDING이 default

Airflow quiz

Q1. Airflow의 환경 설정이 들어있는 파일의 이름은?

  • airflow.cfg

Q2. 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가?

  • airflow.cfg의 [api] 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경
  • 이러한 API을 통해 webserver, scheduler에 대한 monitoring을 걸 수 있다. API로 Health check할 수 있는 endpoint가 있는데 이것을 통해 scheduler가 살았나 죽었나? 마지막 heartbeat는 언제인가?

Q3. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까? :)

  • password, secret, passwd, authorization, api_key, apikey, access_token

Q4. 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?

  • airflow 웹 서버와 스케줄러를 다시 시작해야 합니다.
  • DB에 대한 변경사항이 있다면 db init을 하면된다. 하지만 이 겨우 web UI로 설정해주었던 variable, connection 그리고 dag 실행기록이 모두 날아가니 주의해야한다.

Q5. DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔주기를 결정해주는 키의 이름이 무엇인가?

  • airflow.cfg의 [schedule] 섹션의 dag_dir_list_interval을 변경하면 된다.(초 단위, default 300)
  • commandline으로 airflow dags list를 하게되면 바로 반영된다. 하지만 웹서버로 접속하면 최대 5분을 기다려야할 수도 있다.

dags 폴더에서 코딩시 작성한다면 주의할점

  • airflow.cfg의 [core] 섹션의 dags_folder=/var/lib/airflow/dags 로 airflow의 dag 파일이 저장되는 위치를 지정할 수 있따.
  • airflow는 주기적으로 이 폴더를 스캔하여 airfow.DAG 모듈을 import하고 있는 파일을 실행해보고(디버깅 차원) dags list에 반영한다.
  • 여기서 주의해야할 점은 python 파일을 실제로 실행해본다는 것이다. 실수로 dags폴더에 table을 날리는 test코드를 넣어뒀는데 우연히 이 파일에 airflow.DAG 모듈이 import되어 있다면 5분마다 테이블이 날라가는 마법을 경험하게 될것이다...
from airflow import DAG
...
cur.execute(“DELETE FROM ….”)
  • 현업에서 airflow를 운영할때 3가지 환경을 구성해야한다.
    • airflow 프로덕션 환경
      • 배포 프로세스에 의해서 깃헙 레파지토리에서 테스트 통과한 것이 들어가는 곳
      • 비상시가 아닐때는 개발자가 login 하면 안됨
    • airflow test 환경
      • 왠만하면 test환경과 최대한 비슷하게
      • 읽기 작업을 할때 테이블, api를 프로덕션 환경이랑 동일하게 갈 수 있음
      • 쓰기 작업을 할때는 테스트용 스키마를 따로 만들어서 사용
    • 개인 개발환경
    • 다음주에 한번 더 언급할 예정, 기회가 된다면 현업자들에게 각 회사에서 어떤 방법을 사용하는지 tech talk을 해보려고 한다.
  • 에러가 발생하였을때
    • Web UI에서 Log를 확인해보기
    • airflow test 환경에 접속해서 dag를 실행해 보기

Airflow로 ETL, ELT

Production DB(OLTP) vs OLAP

  • MySQL(OLTP)는 보통 하나의 서버에 구성하면 하나의 서버를 scale up하는 것은 한계가 있다.
    • 메모리 400G에 굉장히 빠른 SSD를 구축하는 것이 한계
  • 그래서 MySQL(OLTP)에는 서비스 운영에 필요한 데이터만을 넣어야한다.
  • 분석을 위한 데이터는 Redshift(OLAP)에 저장해야한다.
  • 유데미에 처음 합류했을 때 가장 만저한 일은 백엔드 엔지니어와 MySQL(OLTP)에 담긴 불필요한 정보를 빼내서 데이터 웨어하우스에 보내는 일을 했다.
    • 유데미에서는 누가, 언제, 어떤 비디오를, 어디까지 봤는지 15초마다 플레이어가 서버에 전송
    • 사용자가 늘어나고 비디오의 수가 늘어나면 엄청난 수의 데이터가 생성된다.
    • 이러한 데이터를 MySQL에 쏘고 있었다. 미친거지...
  • 이러한 분석용 데이터는 Disk에 읽어오고 Redshift에 주기적으로 적제
  • 보통 Redshift로 보내지는 데이터의 상당부분이 프로덕션 데이터베이스로 부터 온다.
  • 이런 걸 CDC로 세팅을 하면 알아서 sync를 해줌 가격도 비싸고 오버헤드도 많다. 테이블의 내용이 다 바뀐다? 오래걸림

MySQL(OLTP) -> Redshift(OLAP) 데이터 복사 ETL

  • MySQL에서 Redshift로 데이터를 Insert 하는 방식은 크게 2가지가 있다.
  • 데이터의 크기가 작은 경우
    • MySQL → Local → Redshitf
    • 이전까지 써오던 방식으로 한줄 한줄씩 Insert
  • 데이터의 크기가 큰 경우
    • MySQL → Local Disk → S3 → Redshitf
    • MySQL에서 데이터를 읽어서 Local Disk에 저장후 S3로 전송
      • 이때 데이터의 형식은 csv로 저장해서 압축하는 방법
      • Big data system에서 사용하는 binary format(Parquet, Avro)
    • S3에 저장된 파일을 Redshift르 COPY(bulk update)
    • S3는 파일의 분산 저장을 지원해서 업로드, Redshift로의 COPY를 굉장히 빠르게 할 수 있다.

Incremental Update를 하기 위해서

  • Production 데이터베이스에 각 record에 modified_at, deleted 컬럼이 있어야함
  • created_at(timestamp): optional
  • modified_at(timestamp): 최근 변경 날짜
  • deleted(boolean) : 레코드를 삭제하는 경우 redshift에게도 이 사실을 알려야해서 바로 삭제하면 안된다.

Backfill을 실행하기 위해선

  • 해당 dag의 catchup=True
  • execution_date을 사용해서 Incremental update가 구현되어 있어야함
  • 실행순서는 random 만약 날짜순으로 하고싶다면 default_args['depends_on_past']=True로 설정
  • max_active_runs를 1보다 크게 잡으면 sql_to_s3과정에서 consistency가 깨질 수도 있음. 이 경우 S3에 저장할 S3_key 값을 execution_date과 같은 값을 포함시켜 구분시켜줘야함

Summary table 생성(ELT)

  • schedule='once' : 내가 명시적으로 실행하거나, 다른 dag가 trigger를 해야만 실행
  • test를 configuration 형태로 만들면 굉장히 유용
    • 사용하는 테이블에 대한 input check
    • 결과 테이블에 대한 output check
    • 다음주에 확인 해봐야함
    • 이것을 범용적으로 만든 것이 dbt -> analytics engineer

추가적인 내용

  • 원래는 mysql_to_s3, oracle_to_s3 ... 처럼 DB마다 operator가 있었는데 이제는 sql_to_s3와 같이 통합적으로 지원함
  • sql_to_s3는 내부적으로 pandas 모듈을 사용해서
sudo pip3 install numpy
sudo pip3 install pandas
  • airflow 2.5.1은 불안정한 버전이라 모듈들간의 충돌이 존재
    • pip install pyopenssl --upgrade
  • Redshift가 S3에 접근하기 위해선 토큰이 필요하다.
  • Jinja template 어떤 매크로 상수 값을 코드상에서 expansion해주는 매커니즘
    • 보통 Jinja template의 변수들은 {{ var }} 와 같은 형태를 가짐
  • Backfill의 end date은 execution_date에 포함안됨
  • S3으로 쏴주는 파일의 형식
    • json을 사용하는 것은 모든 레코드들 마다 field이름이 들어가서 크기가 커져 좋지 않다.
    • 가능하면 binary format 으로 변환하여 S3에 저장하는게 좋음
      • 이러한 파일은 압축도 잘 되어 있고 중간에 끊어서 읽기도 좋게 되있다.
      • Spark과 같은 빅데이터 처리를 할때 binary format을 주로 사용한다.
      • parquet, Avro에는 스키마 정보가 들어가 있다
      • Avro는 row 단위 압축
      • parquet는 row group으로 나눈 다음 그 안에서 column별로 압축을 함. 압축률도 좋고 중간중간 끊어서 들어가기 쉬워서 빅데이터 처리에서 기본 파일 포맷은 parquet이다.
    • csv 파일을 쓸 경우 csv를 여러개로 나눠서 압축을 하는 것이 좋다.
      • S3는 분산 시스템이기때문에 병렬 업로드, 병렬 처리를 지원해서..

궁금

  • max_active_runs를 1보다 크게하고 depends_on_past를 걸면?
  • MySQL의 COPY command로 테이블을 파일의 형태로 저장하고 S3 업로드?..
    01:55:00 부터 다시

0개의 댓글