[ray]로 크롤링 속도 개선

포동동·2023년 4월 23일
1

[유튜브 프로젝트]

목록 보기
9/13

문제상황

python 병렬처리 프레임워크인 ray를 기존 코드에 도입하여 속도 개선을 도모한다.


현재상황

네이버, 유튜브, AI 모델에서 수집/생성되는 연관어 데이터를 pandas DataFrame으로 만들고 SQLAlchemy를 이용해 DB에 저장한다.

def naver(keyword:list) -> list :
	...
    return naver_related_words

def youtube(keyword:list) -> list :
	...
    return youtube_related_words

def gensim(keyword:list) -> list :
	...
    return gensim_related_words

def to_df(*args) :
 	keyword = args[0]
    
    # 데이터프레임으로 만들면서 중복제거
    total = [','.join(list(set(x+y+z))) for x,y,z in zip(args[1], args[2], args[3])]
    rel_df = pd.DataFrame(zip(search_keyword, total))
    rel_df.columns = ['keyword', 'related_words']
    
    return rel_df

def to_db(rel_df) :
    ...
    return 

if __name__ == "__main__" :
	search_keyword = [...]
    
   	n_li = naver(keyword)
    y_li = youtube(keyword)
    g_li = gensim(keyword)
    rel_df = to_df(search_keyword, n_li, y_li, g_li)
    to_df(rel_df)
    
    print("Data insert done.")
        
    

이 상태로도 사실 속도가 엄청 느린 것은 아니지만 이왕 ray를 알게 된 김에 병렬처리를 해보자 마음 먹고 시도했다.


고려사항

  1. 데이터 수집 함수를 하나로 만들어서 ray.remote하는 함수를 하나로 관리할까.
  2. 현재는 각 함수가 반환하는 건 리스트 형태지만, ray를 이용하면 비동기적으로 task가 실행되기 때문에, 리스트로 받는 키워드들 중 각 단어들이 랜덤으로 실행되서 키워드와 연관어를 매칭시킬 수 없다.

이에 대해 생각해봤을 때,

  • 1번에 대해서, 공식 ray 깃헙에서도 ray실행 시 문제가 생기면 함수를 쪼개는 것을 우선 추천한다. 게다가 네이버는 requests를, 유튜브는 selenium을 쓰므로 결국 먼저 작업이 끝난 requests가 selenium을 기다린다면 큰 의미가 있을까 싶어서 보류.
  • 2번에 대해서, 예를 들어, 들어온 키워드 리스트가 ['사과', '딸기', '오렌지']라면 네이버, 유튜브, gensim에서 어떤 키워드를 먼저 픽해서 데이터를 수집할 지 모르기 때문에, 현재처럼 연관어 리스트만 반환하지 않고 딕셔너리데이터프레임으로 반환하여 키워드를 key로, 연관어를 value 형태로 가져가야 나중에 합치기 편할 것이다.

1번은 2번이 너무 시간이 오래걸리면 시도해보기로 하고, 2번 방식을 시도해보았다. 다만, 네이버, 유튜브, gensim에서 수집되는 데이터를 모두 데이터프레임으로 반환하면 너무 비효율적일 것 같아서 딕셔너리로 반환하는 방식을 선택했다.


해결방법

최종적으로 네이버, 유튜브, gensim return 딕셔너리 -> 각 사이트에서 연관어 수집하고 dictionary인 채로 중복 제거 -> DB insert 방식으로 결정했다.

즉, return 값으로 설명하면

  • 기존 코드
    • naver(키워드 리스트)
      • return [[키워드1에 대한 연관어], [키워드2에 대한 연관어], ...]
    • youtube(키워드 리스트)
      • return [[키워드1에 대한 연관어], [키워드2에 대한 연관어], ...]
    • gensim(키워드 리스트)
      • return [[키워드1에 대한 연관어], [키워드2에 대한 연관어], ...]

👉 이 뒤에 데이터프레임 형태로 중복제거

  • 수정 코드
    • naver(키워드 한 개)
      • return {키워드:연관어}
    • youtube(키워드 한 개)
      • return {키워드:연관어}
    • gensim(키워드 한 개)
      • return {키워드:연관어}

👉 ray로 각 함수들을 병렬 처리 한 뒤, dictionary들을 key에 따라 합치면서 set을 통해 중복없이 value들만 합친다.


문제 해결

import ray

@ray.remote
def naver(keyword:str) -> list :
	...
	naver_rel_words = {keyword:rel_words}
    return naver_rel_words

@ray.remote
def youtube(keyword:str) -> list :
	...
	youtube_rel_words = {keyword:rel_words}
    return youtube_rel_words

@ray.remote
def gensim(keyword:str) -> list :
	...
	gensim_rel_words = {keyword:rel_words}
    return gensim_rel_words

if __name__ == "__main__" :
	search_keyword = [...]
    
   	num_cpus = 8

    # ray 시작
    ray.init(num_cpus = num_cpus, ignore_reinit_error=True)

    # 키워드별로 ray 함수 실행
    results = []
    for keyword in search_keyword:
        result_naver = naver_relwords.remote(keyword)
        result_youtube = youtube_relwords.remote(keyword)
        result_gensim = gensim_relwords.remote(keyword)
        results.append([result_naver, result_youtube, result_gensim])

    # 결과 합치기
    related_keywords_dict = {}
    for result in results:
        result_naver, result_youtube, result_gensim = ray.get(result)

        for d in [result_naver, result_youtube, result_gensim]:
            for k, v in d.items():
                related_keywords_dict.setdefault(k, set()).update(set(v))
    
    print("Data insert done.")

기존 코드 기준으로 검색어 100개의 연관어를 크롤링하는데 걸린 시간은 250.27224 sec였는데, 수정 코드 기준으로는 269.93527 sec이다. 왜...?

다시 문제상황

수정한 코드로는 로직이 다음과 같다.

  1. 키워드 리스트에서 키워드마다 naver, youtube, gensim의 함수들을 remote해서 task id들을 쪼르륵 받는다(아직 함수가 실행된 게 아니라 id만 부여받은 것).
  2. 이걸 한 번에 results라는 리스트에 한 번에 넣는다.
  3. 그리고 그 results를 돌면서 result마다 ray.get을 통해 함수들을 실행시킨다.
  4. result마다 결과값들(result_naver, result_youtube, result_gensim)을 가지고 한 번에 다 합쳐주는 작업을 result를 도는 for문 안에서 실행한다.

즉, 내가 생각한 지금 로직의 문제는 한 result, 즉, 한 개의 키워드에 대해 remote된 함수들을 get 시키고 합치는 로직을 한 번에 다 끝내야 다음 키워드로 넘어갈 수 있기 때문에 병렬 처리로써 역할을 못 하고 있다는 것이다.

내가 해야하는 작업은, results에 담긴 함수들을 순서 상관없이 되는대로 얼릉얼릉 어떤 result 순서든 상관없이 get을 시키는 것이다. 그래서 아래와 같이 코드를 수정했다.

    for keyword in search_keyword:
    	# ray.wait에 task를 올리기 위해서는 반드시 ray.ObjectRef 형태여야 하기 때문에 remote결과를 바로 results 리스트에 담아준다. 
        result_naver = naver_relwords.remote(keyword)
        results.append(result_naver)

        result_youtube = youtube_relwords.remote(keyword)
        results.append(result_youtube)

        result_gensim = gensim_relwords.remote(keyword)
        results.append(result_gensim)

    # 결과 합치기
    ready, remaining = ray.wait(results)

    while ready:
        for result in ready:
            result_naver, result_youtube, result_gensim = ray.get(result)

            for d in [result_naver, result_youtube, result_gensim]:
                for k, v in d.items():
                    related_keywords_dict.setdefault(k, set()).update(set(v))

            result_naver.unpin()
            result_youtube.unpin()
            result_gensim.unpin()

        ready, remaining = ray.wait(remaining)

하지만, 계속해서 에러가 났다. 주로 나온 에러는 아래와 같았다.

result_naver, result_youtube, result_gensim = ray.get(result)
ValueError: not enough values to unpack (expected 3, got 1)

내가 파악하기로는 ray.wait를 실행했을 때, 연관어가 없는 함수의 경우에는 task id를 반환하지 않아서 이 ray가 get을 할 때 3개 준다면서 왜 1개나 2개가 없어? 였다. 즉, 알아서 none을 넘겨서 결과값이 있는 task들만 자동으로 실행하고 결과값이 없는(연관어가 없는) task들은 함수의 결과처럼 비어있는 딕셔너리를 반환해주지 않는 것이다. -> 추후 공부해서 알아내야겠다.

결국, 병렬처리가 반드시 필요한 selenium을 사용하는 youtube 함수만 ray를 붙이고, 나머지 naver랑 gensim 함수는 그냥 기존 함수를 사용하기로 했다.


그 결과, 416.60513 sec라는 결과가 나왔다. 왜 때문에....결국 처음 코드대로 keyword list 자체를 넘기고 그 안에서 계속해서 연관어를 찾는 것이 훨씬 빨랐다는....ㅎㅎㅎ

오늘도 삽질의 연속이었다.

profile
완료주의

0개의 댓글