AWS Lambda에서 Multiprocessing 적용하기

김기욱·2022년 2월 3일
2

TIL

목록 보기
8/11

선요약

파이썬기반의 AWS Lambda를 사용하는 경우 Process class + Manager class를 활용한다

람다의 한계에 관해서

AWS lambda는 분명 서버리스 아키텍처를 구축할 수 있는 합리적이고 편리한 서비스입니다.
하지만 치명적인 단점이 있습니다. 함수의 최대 실행시간이 15분으로 제한되어있기 때문입니다.
그러므로 배치성 대용량 데이터 처리 시 람다를 활용하려고 한다면 이런 시간제한 굉장히 큰 걸림돌로 작용하게 됩니다.

Multiprocessing

람다 대신 AWS Batch 등과 같이 배치성 데이터 처리에 적합한 다른 서비스를 쓰는 방식도 있겠지만 저는 그것보다는 파이썬 내부함수를 최대한 활용해 문제를 해결하고 싶었습니다.

가장 먼저 활용하려 했던 방식은 Pool class + starmap + Manager class를 활용한 처리방식입니다. 간단하게 소스코드로 확인해보겠습니다.

import multiprocessing as mp

# 멀티프로세스 pool 생성 
pool = mp.Pool(6)

# 매니저 클래스 호출
manager = mp.Manager()

# 공유리스트 생성
origin_list = manager.list()

# limit & offset 용 count
count_list = (0, 1, 2, 3, 4, 5)

# 멀티프로세싱 
pool.starmap(self._test_func, [(origin_list, count) for count in count_list]) # 대용량 쿼리가 실행되는 함수 
pool.close()
pool.join()

# 멀티프로세싱 종료 후 정렬
final_list = [v['result'] for v in sorted(origin_list, key=lambda x : x['count'])]

# 결과합산
result = sum(final_list, [])

결론만 말하자면 해당 로직은 로컬에선 문제없으나 실제 lambda함수에 배포하고 실행하면 실행되지않습니다. 람다에서는 Queue와 Pool class를 지원하지 않기 때문입니다.

AWS 병렬식 처리 예시문서에 이미 명시되어있습니다.

The multiprocessing module that comes with Python 2.7 lets you run multiple processes in parallel. Due to the Lambda execution environment not having /dev/shm (shared memory for processes) support, you can’t use multiprocessing.Queue or multiprocessing.Pool.

다음으론 공식문서나 여러곳에서 권장하는 방식인 Process + Pipe 방식의 병렬처리입니다.
해당 방식의 워크플로우는 Understanding Multiprocessing in AWS Lambda with Python 해당 미디엄을 참조하시면 됩니다.

from multiprocessing import Pipe, Process

# limit & offset 용 count
count_list = (0, 1, 2, 3, 4, 5)

# 멀티프로세싱 
processes = []
parent_connections = []

for count in count_list:         
    # 결과값 합산을 위한 파이프 객체 생성   
    parent_conn, child_conn = Pipe()
    parent_connections.append(parent_conn)

    # 프로세스 생성
    process = Process(target=self._test_func, args=(count, child_conn))
    processes.append(process)

# 멀티프로세스 스타트
for process in processes:
    process.start()

# 멀티프로세스 종료
for process in processes:
    process.join()

origin_list = []

# 결과값 합산
for parent_connection in parent_connections:
    origin_list.append(parent_connection.recv()) 

# 멀티프로세싱 종료 후 정렬
final_list = [v['result'] for v in sorted(origin_list, key=lambda x : x['count'])]

# 평탄화
result = sum(final_list, [])

하지만 이 방식엔 문제가 있습니다. Pipe의 send close recev 함수를 실행하는 구간에서 버퍼가 걸리는 경우가 종종 발생합니다. 특히 하나의 프로세스가 처리하는 데이터의 크기가 커질 경우 이런 현상을 자주 확인했습니다.

from multiprocessing import Manager, Process

# limit & offset 용 count
count_list = (0, 1, 2, 3, 4, 5)

# 멀티프로세싱 
processes = []

# 공유리스트
origin_list = Manager().list()

for count in count_list:         
    # 프로세스 생성
    process = Process(target=self._test_func, args=(count, origin_list))
    processes.append(process)

# 멀티프로세스 스타트
for process in processes:
    process.start()

# 멀티프로세스 종료
for process in processes:
    process.join()

# 정렬 & 평탄화
# count의 숫자에 맞춰 쿼리 결과값을 정렬해줌
result = sum([v['result'] for v in sorted(origin_list, key=lambda x : x['count'])], [])

결론적으로 제가 채택한 방식은 두 가지 방식을 혼합한 Manager + Process입니다. Lambda에서 이상없이 돌아갔으며 결과도 깔끔하게 나왔습니다. 혹시 람다에 멀티프로세싱을 통해 대용량 배치작업을 하시려는 분들에게 도움이 되었으면 좋겠습니다. 감사합니다.

profile
어려운 것은 없다, 다만 아직 익숙치않을뿐이다.

0개의 댓글