[TIL 52일자] 데브코스 데이터엔지니어링

·2023년 6월 22일
0

데브코스

목록 보기
48/55
post-thumbnail

📚 오늘 공부한 내용

1. 구글 시트 -> Redshift 연동하기

1) 구글 account 생성

  • 시트 API가 활성화되면 Credentials 선택
  • +CREATE CREDENTIALS를 선택 후 Service Account 선택
  • API Keys 화면에서 방금 생성한 Service Account 선택
  • 프로젝트명의 Keys 메뉴 선택
  • Private Key 생성 화면에서 JSON을 선택하고 해당 파일을 다운로드
  • Airflow의 Variable로 등록 (실습에서는 google_sheet_access_token으로 설정)

2) 구글 시트를 테이블로 복사하기 위해 시트에 이메일 공유

  • 스프레드 시트에서 Share를 통해 Service Account 이메일 공유. (iam.gserviceaccount.com이 포함된 이메일)

3) S3 Connection 설정

  • Admin -> Connection
  • conn id: aws_conn_id
  • conn type: S3나 AMAZON WEB SERVICE나 GENERIC 선택
  • Extra: `{ "region_name": "ap-northeast-2"}

4) 복제할 테이블 생성

  • 구글 Colab을 통해 스프레드 시트의 내용을 복사할 테이블을 CREATE 해 준다.
  • 테이블명은 spreadsheet_copy_testing로 생성한다. (실습 기준)
CREATE TABLE SSONG_JI_HY.SPREADSHEET_COPY_TESTING(
	  COL1 INT
	, COL2 INT
    , COL3 INT
    , COL4 INT
)

5) PythonOperator를 통해 COPY

  • 스프레드 시트의 url을 통해 호출해 준다.
  • import gspread를 통해 구글 스프레드 시트와 연동하기 위해 사용된다.
  • gsheet.py 모듈을 생성하는데 이때 스프레드 시트를 읽어 JSON 파일을 DATA_DIR 밑에 생성하고 이를 통해 구글 시트를 조작할 수 있는 OBJECT를 생성해 준다.

sheets = [
    {
        "url": "https://docs.google.com/spreadsheets/d/1hW-_16OqgctX-_lXBa0VSmQAs98uUnmfOqvDYYjuE50/",
        "tab": "SheetToRedshift",
        "schema": "ssong_ji_hy",
        "table": "spreadsheet_copy_testing"
    }
]

for sheet in sheets:
    download_tab_in_gsheet = PythonOperator(
        task_id = 'download_{}_in_gsheet'.format(sheet["table"]),
        python_callable = download_tab_in_gsheet,
        params = sheet,
        dag = dag)

    s3_key = sheet["schema"] + "_" + sheet["table"]

    copy_to_s3 = PythonOperator(
        task_id = 'copy_{}_to_s3'.format(sheet["table"]),
        python_callable = copy_to_s3,
        params = {
            "table": sheet["table"],
            "s3_key": s3_key
        },
        dag = dag)

    run_copy_sql = S3ToRedshiftOperator(
        task_id = 'run_copy_sql_{}'.format(sheet["table"]),
        s3_bucket = "grepp-data-engineering",
        s3_key = s3_key,
        schema = sheet["schema"],
        table = sheet["table"],
        copy_options=['csv', 'IGNOREHEADER 1'],
        method = 'REPLACE',
        redshift_conn_id = "redshift_dev_db",
        aws_conn_id = 'aws_conn_id',
        dag = dag
    )

2. Redshift -> 구글 시트 연동하기

  • 적절한 테이블을 선택해야 한다. 너무 데이터가 많은 테이블의 경우 속도 이슈가 발생할 수 있다.

1) DAG

  • SELECT * FROM analytics.nps_summary 쿼리문을 update_gsheet에 넘겨 준다.
  • 해당 내용을 plugin에 있는 gsheet.py로 넘겨 주어 구글 시트에 연동될 수 있도록 처리해 준다.
from airflow import DAG
from airflow.operators.python import PythonOperator

from plugins import gsheet
from datetime import datetime

def update_gsheet(**context):
    sql = context["params"]["sql"]
    sheetfilename = context["params"]["sheetfilename"]
    sheetgid = context["params"]["sheetgid"]

    gsheet.update_sheet(sheetfilename, sheetgid, sql, "redshift_dev_db")


with DAG(
    dag_id = 'SQL_to_Sheet',
    start_date = datetime(2022,6,18),
    catchup=False,
    tags=['example'],
    schedule = '@once'
) as dag:

    sheet_update = PythonOperator(
        dag=dag,
        task_id='update_sql_to_sheet1',
        python_callable=update_gsheet,
        params = {
            "sql": "SELECT * FROM analytics.nps_summary",
            "sheetfilename": "spreadsheet-copy-testing",
            "sheetgid": "RedshiftToSheet"
        }
    )

2) update_sheet를 gsheet.py에 생성

  • PostgreHook을 사용해 postgres 환경과 연동한다.
  • pandas를 통해 SELECT 한 내용을 DataFrame으로 생성하고 이를 스프레드 시트(worksheet)로 출력한다.
  • 이 시트 내용을 BULK UPDATE 한다.
def update_sheet(filename, sheetname, sql, conn_id):
    client = get_gsheet_client()
    hook = PostgresHook(postgres_conn_id=conn_id)
    sh = client.open(filename)
    df = hook.get_pandas_df(sql)
    print(sh.worksheets())
    sh.worksheet(sheetname).clear()
    add_df_to_sheet_in_bulk(sh, sheetname, df.fillna(''))


3. Airflow API 활성화

1) config 및 yaml 설정

  • airflow.cfg의 API Section에서 auth_backend 값을 변경한다.
  • 또한 VPN을 설정해 놓고 VPN을 통과하지 못한 사람은 보지 못하도록 처리하는 것이 좋다. public하면 위험에 노출되기 쉽다.
  • docker-compose.yaml에는 이미 설정되어 있다. (environments 밑의 `AIRFLOW__API_AUTH_BACKENDS: ``)
  • AIRFLOW__Section__KEYAIRFLOW의 Section에서 KEY에 해당하는 내용을 오버라이딩 하는 것이다. 복수형인 경우는 KEY에 S를 붙여 주며 ,를 통해 이어 준다.
  • docker exec -it scheduler_container_name airflow config get-value api auth_backend 해당 명령을 실행해 보면 위에서 설정한 값이 나온다.

2) API 사용자 생성

  • Airflow Web UI에서 새로운 사용자를 추가할 수 있다.
    • Security -> List Users -> +
    • 새 사용자 정보 추가 (monitor: password)
    • Role은 User로 설정 (airflowadmin 계정이기 때문에 따로 유저 계정을 만드는 것이 좋다.)

3) Health API 호출

  • /health API 호출
    - `curl -X GET --user "user_name:password" http://localhost:8080/health"
    • 정상 경우인 경우 metadatabasestatusschedulerstatus를 확인할 수 있다.
    • 최근 실행 일자를 알 수 있다. `latest_scheduler_heartbeat"'
  • 📚 airflow API

4. API의 예

  • 특정 DAG를 API로 Trigger

    • POST 명령이 되어야 한다. DAG를 실행하는 명령들은 execution_date가 필요하며 이를 data로 POST에 데이터 쪽에 저장해 두기 위해 사용된다.
    • curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d '{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
  • 모든 DAG 리스트

    • curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/dags
    • 모든 DAG가 RETURN 된다.
    • is_active가 True인데 is_paused가 True인 경우가 존재한다. is_active는 코드가 DAGs 폴더에 존재한다면 True이고 is_paused는 아직 활성화가 되어 있지 않은 경우 True이다.
    • statsqueued면 실행이 됐다는 뜻이다.
  • DAGS를 조회할 수 있는 Python 코드

import requests
from requests.auth import HTTPBasicAuth

url = "http://localhost:8080/api/v1/dags"
dags = requests.get(url, auth=HTTPBasicAuth("airflow", "airflow"))
print(dags.text) #dags.json
  • 모든 Variable 리스트 하기
    • curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables
    • list 내에는 total_entries만큼의 Variable이 존재한다.
    • Connection도 Format은 동일하다.
    • 주기적으로 Variable을 백업하고 싶다면 API를 주기적으로 불러 저장하는 방법을 사용할 수 있다.
  • 모든 Config 리스트 하기
    • curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config
    • admin 계정으로 액세스해도 기본적으로 막혀 있기 때문에 403 Forbidden 오류가 발생한다.
    • config API 엔드 포인트는 기본으로 disable 되어 있다.
    • airflow.cfg에서 이를 풀어 줄 수 있는 key가 존재한다. 이 키를 찾아 AIRFLOW Section Key를 docker-compose.yaml에서 오버라이딩 해 주어야 한다.
  • Variables/Connections Import/Export
    • DB에 기록이 돼서 Web UI에 보이는 Variables와 Connections만 볼 수 있고, 환경 변수로 등록된 것은 볼 수 없다.
    • airflow variables export variables.json
    • airflow variables import variables.json
    • airflow connections export connections.json
    • airflow connections import connections.json


🔎 어려웠던 내용 & 새로 알게 된 내용

📌 과제

  • 활성화된 DAGS만 출력해 주는 Python 코드를 작성
    - is_paused가 False인 값만 ID 출력 (GITHUB로 PR)
  • config API의 경우 Access가 거부당했는데 이를 컨트롤 해 주는 KEY가 무엇인지와 그 키를 docker-compose.yaml에 어떻게 적용해야 하는지 (슬랙 DM 제출)
  • connections API와 variables API는 환경 변수도 리턴하는지 아닌지 (슬랙 DM 제출)


✍ 회고

profile
송의 개발 LOG

0개의 댓글