1. 구축환경

- GCP e2 medium은 airflow docker-compose를 지원하기에 부족하여 .5를 늘려주었다.
- ubuntu
2. 덱 정보
3개의 사이트에서 4개의 정보를 가져오고 -> mongodb에 적재 -> 적재되면서 생성된 _id값으로 Google storage에 이미지 저장
- 위의 흐름대로 진행이 되어야했으며, 영화관 사이트를 크롤링하는 과정에서 같은 영화를 중복 값이 생기는 경우도 있었다.
- xcom으로 push하고 pull하는 이유
- airflow에는 작업간 데이터를 주고받는 과정에서 xcom을 이용한다.
why xcom?
- mongodb에 적재된 _id값을 기준으로 구글스토리지에 저장을 하기 때문에 해당 저장이 어떤 _id로 되었는지 확인하고 적재하려면 mongodb를 다시 조회해야하며, 언제 적재가된건지에 대한 키 value 데이터도 추가해야하기에 task 처리과정이 복잡해지고 수고로움이 생기게 되었다.
dag파일
- 덱파일은 다음과 같다.
- 각각의 작업들을 파이썬 파일을 생성하여 덱 파일을 최대한 간결하게 하고자 하였다.
- 크롤링덱은 cgv_scraper, mega_scraper, Netfl_watcha_scraper로 만들었으며, google_cloud_storage_upload로 포스터 이미지 저장하는 코드를 작성하였다.
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from cgv_scraper import fetch_cgv_movies, save_cgv_movies
from mega_scraper import fetch_mega_movie, save_mega_mongo
from Netfl_watcha_scraper import update_movies_for_collections
from google_cloud_storage_upload import upload_movie_poster_to_gcs
default_args = {
"owner": "pshyeok",
"retries": 3,
"retry_delay": datetime.timedelta(minutes=3),
"start_date": datetime.datetime(2023, 8, 8),
}
dag = DAG(
dag_id="movie_scraper_dag",
default_args=default_args,
description="Scrape movie data and upload images to Google Cloud Storage",
schedule_interval=datetime.timedelta(days=1),
catchup=False,
max_active_runs=1,
)
def cgv_scraper(**kwargs):
cgv_movies = fetch_cgv_movies()
cgv_mongo = save_cgv_movies(cgv_movies)
kwargs['ti'].xcom_push(key='cgv_mongo', value=cgv_mongo)
def mega_scraper(**kwargs):
merged_movies = fetch_mega_movie()
mega_mongo = save_mega_mongo(merged_movies)
kwargs['ti'].xcom_push(key='mega_mongo', value=mega_mongo)
def netflix_watcha_scraper(**kwargs):
net_mongo = update_movies_for_collections(urls)
kwargs['ti'].xcom_push(key='net_mongo', value=net_mongo)
def merge_and_process_movies(**kwargs):
cgv_movies = kwargs['ti'].xcom_pull(key='cgv_mongo')
mega_movies = kwargs['ti'].xcom_pull(key='mega_mongo')
netflix_watcha_movies = kwargs['ti'].xcom_pull(key='net_mongo')
all_movies = cgv_movies + mega_movies + netflix_watcha_movies
print(all_movies)
kwargs['ti'].xcom_push(key='all_movies', value=all_movies)
def google_cloud_storage_upload(**kwargs):
all_movies = kwargs['ti'].xcom_pull(key='all_movies')
upload_movie_poster_to_gcs(all_movies)
cgv_scraper_task = PythonOperator(
task_id="cgv_scraper",
python_callable=cgv_scraper,
dag=dag,
)
mega_scraper_task = PythonOperator(
task_id="mega_scraper",
python_callable=mega_scraper,
provide_context=True,
dag=dag,
)
netflix_scraper_task = PythonOperator(
task_id="netfl_watcha_scraper",
python_callable=netflix_watcha_scraper,
provide_context=True,
dag=dag,
)
merge_and_process_movies = PythonOperator(
task_id="merge_and_process_movies",
python_callable=merge_and_process_movies,
dag=dag,
)
google_cloud_storage_upload_task = PythonOperator(
task_id="google_cloud_storage_upload",
python_callable=google_cloud_storage_upload,
provide_context=True,
dag=dag,
)
cgv_scraper_task >> mega_scraper_task >> netflix_scraper_task >> merge_and_process_movies>> google_cloud_storage_upload_task
3. 각 덱에서 불러온 작업
mega_scraper.py
- 크롤링 과정은 길게 불러오기 때문에 하나만 적으려고 한다.
- 최대한 기존 db와 키 밸류를 통일하고자 설정해놓았다.
- 메가박스 데이터는 한페이지에 다 나오지 않아서 movieNo리스트에 각 영화의 url을 추가, 추가된 url을 반복문을 돌려 각 영화의 상세 정보를 가져오게 하였다.
- mongodb에 적재한 이후 삽입된 문서의 _id값을 따로 mega_new_movie라는 리스트에 추가한 후 return을 하여 xcom으로 값을 받을 수 있게 하였다.
from datetime import datetime, timedelta
import pymongo
import requests
from bs4 import BeautifulSoup as BS
import re
import connection
def fetch_mega_movie():
url = 'https://www.megabox.co.kr/on/oh/oha/Movie/selectMovieList.do'
url2 = 'https://www.megabox.co.kr/on/oh/oha/Movie/selectMovieInfo.do'
payload = {
"currentPage": "1",
"recordCountPerPage": "200",
"pageType": "rfilmDe",
"ibxMovieNmSearch": "",
"onairYn": "MSC02",
"specialType": "",
"specialYn": "N"
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36"
}
response = requests.post(url, data=payload, headers=headers)
data = response.json()
movie_list = data['movieList']
formatted_movies = []
movieNo = []
for movie in movie_list:
released_date = movie['rfilmDeReal']
movieNo.append(movie['movieNo'])
formatted_movie = {
"movieNo" : movie['movieNo'],
"title_kr": movie['movieNm'],
"released_At": datetime.strptime(released_date, "%Y%m%d").strftime("%Y-%m-%d") if released_date else None,
"poster_image_url": f"https://img.megabox.co.kr/{movie['imgPathNm'][1:]}" if movie.get('imgPathNm') else None,
"OTT": ["Mega"],
"rating": "UNKNOWN",
"media": "MOVIE",
"comming": "TRUE",
"moochu" : re.sub(r'[^\w\s]', '',movie['movieNm']).replace(' ', '')
}
if not formatted_movie['released_At']:
continue
if movie['admisClassNm'] == '12세이상관람가':
formatted_movie['rating'] = 'OVER12'
elif movie['admisClassNm'] == '15세이상관람가':
formatted_movie['rating'] = 'OVER15'
elif movie['admisClassNm'] == '18세이상관람가':
formatted_movie['rating'] = 'OVER18'
elif movie['admisClassNm'] == '전체관람가':
formatted_movie['rating'] = 'ALL'
formatted_movie['synopsis'] = movie['movieSynopCn'].replace("\n", "<br>")
formatted_movies.append(formatted_movie)
total2 = []
for code in movieNo:
payload3 = {"rpstMovieNo": code}
r = requests.post(url2, data=payload3)
info = BS(r.text).select('div.line p')
info = BS(r.text).select('div.movie-info p')
movieDic = {
'movieNo': code,
'directors': "",
'genres': "",
'actors': "",
'coming':"TRUE"
}
for i in info:
direc = []
genre = []
actor = []
if '감독' in i.text:
for a in i.text.split(':')[1].split(','):
direc.append(a.strip())
movieDic['directors'] = direc
if '장르' in i.text:
for a in i.text.split(':')[1].split('/')[0].split(','):
genre.append(a.strip())
movieDic['genres'] = genre
if '출연진' in i.text:
for x in i.text.split(':')[1].split(','):
actor.append(x.strip())
movieDic['actors'] = actor
total2.append(movieDic)
merged_movies = []
for movie1 in total2:
for movie2 in formatted_movies:
if movie1['movieNo'] == movie2['movieNo']:
merged_movie = {**movie1, **movie2}
merged_movies.append(merged_movie)
break
return merged_movies
def save_mega_mongo(merged_movies):
collection_name = "movies2"
collection = db[collection_name]
for movie in merged_movies:
existing_movie = collection.find_one({'moochu': movie['moochu']})
mega_new_movie=[]
if existing_movie:
if set(movie['OTT']) <= set(existing_movie.get('OTT', [])):
continue
else:
unique_OTTs = list(set(movie['OTT']) - set(existing_movie.get('OTT', [])))
collection.update_one({'moochu': movie['moochu']}, {'$addToSet': {'OTT': {'$each': unique_OTTs}}})
else:
movie['OTT'] = list(set(movie.get('OTT', [])))
collection.insert_one(movie)
insert_result = collection.insert_one(movie)
movie['_id'] = insert_result.inserted_id
mega_new_movie.append(movie)
return mega_new_movie
4. google storage 포스터 이미지 저장
- 마지막 task로 이 작업을 해놓은건 각각의 task가 mongodb에 제대로 적재된 이후 가능한 작업이기 때문이다.
- pymongo로 몽고db에 적재된 데이터를 가져오고
- 해당 데이터를 jpeg 파일로 저장하는 방식으로 진행하였다.
from pymongo import MongoClient
import requests
from google.cloud import storage
import os
from io import BytesIO
import connection
def upload_image_url_to_gcs(bucket_name, url, destination_blob_name):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
response = requests.get(url)
with BytesIO(response.content) as f:
blob.upload_from_file(f, content_type='image/jpeg')
def upload_movie_poster_to_gcs(all_movies):
collection_name = "movies2"
collection = db[collection_name]
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './river-dynamo-393506-f834b1182f86.json'
for movie in all_movies:
movie_id = movie['movie_id']
poster_image_url = movie['poster_img']
bucket_name = 'end_moochu'
destination_name = f'{str(movie_id)}.jpg'
upload_image_url_to_gcs(bucket_name, poster_image_url, destination_name)
print(destination_name)
5. slack api 설정
6. 소감
- 팀원들이 짜놓은 크롤링 코드가 제각각이었고, 처음에 어떤 데이터를 가져올 수 있을지 몰라서 각자 중요하다고 생각한 값 위주로 키 밸류 값을 설정해놓았었다.
- 이 과정에서 다시 크롤링을 해야하는 문제가 발생했고, 3~4개의 크롤링 코드를 다시 짜야하는
아주 기분좋은 경험을 하였다.
- airflow 설정시 빌드를 하였음에도 원하는 값들이 뜨지 않는 문제가 있었다.
- docker-compose.yml 파일을 다시 수정해서 이를 바로 잡았고, build가 실행되기 전에
7. 느낀점
- 만약 크롤링을 해야한다면 가져올 수 있는 값들은 모조리 가져온 이후 나중에 필요한 데이터를 추려서 사용해야겠다고 생각했다.
- airflow의 dag을 짜는 연습을 자주 할 필요가 있다.(아직 xcom이 어렵다)