pj2-2.airflow mongodb 적재 + google storage 적재

hyeok2·2023년 8월 29일
0

1. 구축환경

  • GCP e2 medium은 airflow docker-compose를 지원하기에 부족하여 .5를 늘려주었다.
  • ubuntu

2. 덱 정보

3개의 사이트에서 4개의 정보를 가져오고 -> mongodb에 적재 -> 적재되면서 생성된 _id값으로 Google storage에 이미지 저장

  • 위의 흐름대로 진행이 되어야했으며, 영화관 사이트를 크롤링하는 과정에서 같은 영화를 중복 값이 생기는 경우도 있었다.
  • xcom으로 push하고 pull하는 이유
  • airflow에는 작업간 데이터를 주고받는 과정에서 xcom을 이용한다.

why xcom?

  • mongodb에 적재된 _id값을 기준으로 구글스토리지에 저장을 하기 때문에 해당 저장이 어떤 _id로 되었는지 확인하고 적재하려면 mongodb를 다시 조회해야하며, 언제 적재가된건지에 대한 키 value 데이터도 추가해야하기에 task 처리과정이 복잡해지고 수고로움이 생기게 되었다.

dag파일

  • 덱파일은 다음과 같다.
  • 각각의 작업들을 파이썬 파일을 생성하여 덱 파일을 최대한 간결하게 하고자 하였다.
  • 크롤링덱은 cgv_scraper, mega_scraper, Netfl_watcha_scraper로 만들었으며, google_cloud_storage_upload로 포스터 이미지 저장하는 코드를 작성하였다.
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from cgv_scraper import fetch_cgv_movies, save_cgv_movies
from mega_scraper import fetch_mega_movie, save_mega_mongo
from Netfl_watcha_scraper import update_movies_for_collections
from google_cloud_storage_upload import upload_movie_poster_to_gcs
#.
default_args = {
    "owner": "pshyeok",
    "retries": 3,
    "retry_delay": datetime.timedelta(minutes=3),
    "start_date": datetime.datetime(2023, 8, 8),
}
#dag
dag = DAG(
    dag_id="movie_scraper_dag",
    default_args=default_args,
    description="Scrape movie data and upload images to Google Cloud Storage",
    schedule_interval=datetime.timedelta(days=1),
    catchup=False,
    max_active_runs=1,
)
#cgv
def cgv_scraper(**kwargs):
    cgv_movies = fetch_cgv_movies()
    cgv_mongo = save_cgv_movies(cgv_movies)
    kwargs['ti'].xcom_push(key='cgv_mongo', value=cgv_mongo)
#megabox
def mega_scraper(**kwargs):
    merged_movies = fetch_mega_movie()
    mega_mongo = save_mega_mongo(merged_movies)
    kwargs['ti'].xcom_push(key='mega_mongo', value=mega_mongo)   
#netflix,watcha
def netflix_watcha_scraper(**kwargs):
    net_mongo = update_movies_for_collections(urls)
    kwargs['ti'].xcom_push(key='net_mongo', value=net_mongo)
#포스터이미지 적재하기 위한 작업 
def merge_and_process_movies(**kwargs):
    cgv_movies = kwargs['ti'].xcom_pull(key='cgv_mongo')
    mega_movies = kwargs['ti'].xcom_pull(key='mega_mongo')
    netflix_watcha_movies = kwargs['ti'].xcom_pull(key='net_mongo')
    all_movies = cgv_movies + mega_movies + netflix_watcha_movies
    print(all_movies)
    kwargs['ti'].xcom_push(key='all_movies', value=all_movies)  # 수정된 부분
#구글스토리지에 저장
def google_cloud_storage_upload(**kwargs):
    all_movies = kwargs['ti'].xcom_pull(key='all_movies') # Merge된 데이터 가져오기
    upload_movie_poster_to_gcs(all_movies)  # all_movies 데이터를 함수 인자로 전달
#파이썬 함수를 오퍼레이터로 정의
cgv_scraper_task = PythonOperator(
    task_id="cgv_scraper",
    python_callable=cgv_scraper,
    dag=dag,
)
#메가박스 스크랩
mega_scraper_task = PythonOperator(
    task_id="mega_scraper",
    python_callable=mega_scraper,
    provide_context=True,
    dag=dag,
)
#넷플릭스,왓챠 스크랩
netflix_scraper_task = PythonOperator(
    task_id="netfl_watcha_scraper",
    python_callable=netflix_watcha_scraper,
    provide_context=True,
    dag=dag,
)
#병합하기
merge_and_process_movies = PythonOperator(
    task_id="merge_and_process_movies",
    python_callable=merge_and_process_movies,
    dag=dag,
)
#구글스토리지 적재
google_cloud_storage_upload_task = PythonOperator(
    task_id="google_cloud_storage_upload",
    python_callable=google_cloud_storage_upload,
    provide_context=True,
    dag=dag,
)
#작업 순서 설정
cgv_scraper_task >> mega_scraper_task >> netflix_scraper_task >> merge_and_process_movies>> google_cloud_storage_upload_task

3. 각 덱에서 불러온 작업

mega_scraper.py

  • 크롤링 과정은 길게 불러오기 때문에 하나만 적으려고 한다.
  • 최대한 기존 db와 키 밸류를 통일하고자 설정해놓았다.
  • 메가박스 데이터는 한페이지에 다 나오지 않아서 movieNo리스트에 각 영화의 url을 추가, 추가된 url을 반복문을 돌려 각 영화의 상세 정보를 가져오게 하였다.
  • mongodb에 적재한 이후 삽입된 문서의 _id값을 따로 mega_new_movie라는 리스트에 추가한 후 return을 하여 xcom으로 값을 받을 수 있게 하였다.
from datetime import datetime, timedelta
import pymongo
import requests
from bs4 import BeautifulSoup as BS
import re
import connection
def fetch_mega_movie():
    url = 'https://www.megabox.co.kr/on/oh/oha/Movie/selectMovieList.do'
    url2 = 'https://www.megabox.co.kr/on/oh/oha/Movie/selectMovieInfo.do'
    payload = {
        "currentPage": "1",
        "recordCountPerPage": "200",
        "pageType": "rfilmDe",
        "ibxMovieNmSearch": "",
        "onairYn": "MSC02",
        "specialType": "",
        "specialYn": "N"
    }
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36"
    }


    response = requests.post(url, data=payload, headers=headers)
    data = response.json()
    movie_list = data['movieList']
    formatted_movies = []

    movieNo = [] #영화 줄거리 등을 위한 리스트
        
    for movie in movie_list:
        released_date = movie['rfilmDeReal']
        movieNo.append(movie['movieNo'])
        formatted_movie = {
            "movieNo" : movie['movieNo'],
            "title_kr": movie['movieNm'],
            "released_At": datetime.strptime(released_date, "%Y%m%d").strftime("%Y-%m-%d") if released_date else None,
            "poster_image_url": f"https://img.megabox.co.kr/{movie['imgPathNm'][1:]}" if movie.get('imgPathNm') else None, 
            "OTT": ["Mega"],
            "rating": "UNKNOWN",
            "media": "MOVIE",
            "comming": "TRUE",
            "moochu" :  re.sub(r'[^\w\s]', '',movie['movieNm']).replace(' ', '')
            }
        if not formatted_movie['released_At']:
            continue
        
        if movie['admisClassNm'] == '12세이상관람가':
            formatted_movie['rating'] = 'OVER12'
        elif movie['admisClassNm'] == '15세이상관람가':
            formatted_movie['rating'] = 'OVER15'
        elif movie['admisClassNm'] == '18세이상관람가':
            formatted_movie['rating'] = 'OVER18'
        elif movie['admisClassNm'] == '전체관람가':
            formatted_movie['rating'] = 'ALL'
        formatted_movie['synopsis'] = movie['movieSynopCn'].replace("\n", "<br>")
        formatted_movies.append(formatted_movie)
        
        
    # 영화장르, 등 데이터 
    total2 = []
    for code in movieNo:
        payload3 = {"rpstMovieNo": code}
        r = requests.post(url2, data=payload3)
        info = BS(r.text).select('div.line p')
        info = BS(r.text).select('div.movie-info p')
        movieDic = {
            'movieNo': code,
            'directors': "",
            'genres': "",
            'actors': "",
            'coming':"TRUE"
        }
        
        for i in info:
            direc = []
            genre = []
            actor = []
            if '감독' in i.text:
                for a in i.text.split(':')[1].split(','):
                    direc.append(a.strip())
                movieDic['directors'] = direc
            if '장르' in i.text:
                for a in i.text.split(':')[1].split('/')[0].split(','):
                    genre.append(a.strip())
                movieDic['genres'] = genre
            if '출연진' in i.text:
                for x in i.text.split(':')[1].split(','):
                    actor.append(x.strip())
                movieDic['actors'] = actor
        total2.append(movieDic)
        
    # movieNo로 두 데이터를 비교 후 merge
    merged_movies = []
    for movie1 in total2:
        for movie2 in formatted_movies:
            if movie1['movieNo'] == movie2['movieNo']:
                merged_movie = {**movie1, **movie2}
                merged_movies.append(merged_movie)
                break
    return merged_movies

# db 저장여부 확인후 db 적재
            
def save_mega_mongo(merged_movies):
    collection_name = "movies2"
    collection = db[collection_name]

    for movie in merged_movies:  # 메가박스 정보 2개 합친 것.
        existing_movie = collection.find_one({'moochu': movie['moochu']})
        mega_new_movie=[]
        if existing_movie:
            if set(movie['OTT']) <= set(existing_movie.get('OTT', [])):
                continue
            else:
                unique_OTTs = list(set(movie['OTT']) - set(existing_movie.get('OTT', [])))
                collection.update_one({'moochu': movie['moochu']}, {'$addToSet': {'OTT': {'$each': unique_OTTs}}})
        else:
            movie['OTT'] = list(set(movie.get('OTT', [])))  # Initialize as an array if not present
            collection.insert_one(movie)

            insert_result = collection.insert_one(movie)
            
            # 삽입된 문서의 _id 값을 영화 사전에 추가합니다.
            movie['_id'] = insert_result.inserted_id
            mega_new_movie.append(movie)
    return mega_new_movie

4. google storage 포스터 이미지 저장

  • 마지막 task로 이 작업을 해놓은건 각각의 task가 mongodb에 제대로 적재된 이후 가능한 작업이기 때문이다.
  • pymongo로 몽고db에 적재된 데이터를 가져오고
  • 해당 데이터를 jpeg 파일로 저장하는 방식으로 진행하였다.
from pymongo import MongoClient
import requests
from google.cloud import storage
import os
from io import BytesIO
import connection

def upload_image_url_to_gcs(bucket_name, url, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    response = requests.get(url)

    with BytesIO(response.content) as f:
        blob.upload_from_file(f, content_type='image/jpeg')


def upload_movie_poster_to_gcs(all_movies):
    collection_name = "movies2"
    collection = db[collection_name]

    # 연결된 도큐먼트들에서 필요한 정보들 수집
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './river-dynamo-393506-f834b1182f86.json'

    # 수집한 정보를 이용하여 이미지 파일을 Google Cloud Storage에 업로드합니다.
    for movie in all_movies:
        movie_id = movie['movie_id']
        poster_image_url = movie['poster_img']
        bucket_name = 'end_moochu'
        destination_name = f'{str(movie_id)}.jpg'
        upload_image_url_to_gcs(bucket_name, poster_image_url, destination_name)
        print(destination_name)

5. slack api 설정

  • 작업이 제대로 실행되었는지 확인하는데는 꼭 필요한 slack api
  • 시간 관계상 slack api는 추천모델을 돌릴때 제대로 실행되는지에만 적용해보았다. 하지만 똑같은 위치에 파일을 두었기에 import 만 해주면 추가된다.
    https://velog.io/@pshyeok2/airflow-slack-api-%EC%95%8C%EB%A6%BC

6. 소감

  • 팀원들이 짜놓은 크롤링 코드가 제각각이었고, 처음에 어떤 데이터를 가져올 수 있을지 몰라서 각자 중요하다고 생각한 값 위주로 키 밸류 값을 설정해놓았었다.
  • 이 과정에서 다시 크롤링을 해야하는 문제가 발생했고, 3~4개의 크롤링 코드를 다시 짜야하는 아주 기분좋은 경험을 하였다.
  • airflow 설정시 빌드를 하였음에도 원하는 값들이 뜨지 않는 문제가 있었다.
  • docker-compose.yml 파일을 다시 수정해서 이를 바로 잡았고, build가 실행되기 전에

7. 느낀점

  • 만약 크롤링을 해야한다면 가져올 수 있는 값들은 모조리 가져온 이후 나중에 필요한 데이터를 추려서 사용해야겠다고 생각했다.
  • airflow의 dag을 짜는 연습을 자주 할 필요가 있다.(아직 xcom이 어렵다)
profile
땅을 파다보면 흙과 물을 보겠지만, 코드를 파다보면 답이 보일것이다.

0개의 댓글