1. 장단점/사용목적
1.1. 사용 시나리오
- 대량의 데이터(예: numpy 배열)를 프로세스 간에 공유할 때 사용
1.2. 장점
- 메모리를 직접 공유하기 때문에 대량의 데이터를 빠르게 전달할 수 있음.
데이터 복사 과정이 없어, 다른 IPC 메커니즘에 비해 효율적
1.3. 단점
- 동시성 제어(comcurrency control)를 개발자가 직접 관리해야 함
- 예를 들어, 뮤텍스(mutexes)나 세마포어(semaphores)를 사용해야 할 수도 있음
- 데이터 구조가 고정되어야 하며, 동적인 데이터 구조에는 적합하지 않을 수 있음
네트워크를 통해 다른 머신들과 array를 공유할 때에는 작동하지 않는다.
- 이럴 때는 redis나 다른 기술에 의존해야 한다.
1.4. 지원 가능한 데이터 type
- 바이트-호환 데이터 (bytes, bytearray 등)
- array.array, numpy.array 같은 배열 데이터
- C의 구조체를 모방한 ctypes 모듈의 데이터 구조
2. 사용법
SharedMemory(name=None, create=False, size=0)
2.1. 본문
- 새 공유 메모리 블록을 만들거나 기존 공유 메모리 블록에 연결
- 각 공유 메모리 블록에는 고유한 이름이 지정
- 이런 식으로, 하나의 프로세스가 특정 이름을 가진 공유 메모리 블록을 생성 할 수 있으며,
- 다른 프로세스가 같은 이름을 사용하여 같은 공유 메모리 블록에 연결할 수 있음
- 공유 메모리 블록은 생성한 원래 프로세스보다 오래갈 수 있음
close() 메서드
- 한 프로세스가, 더는 다른 프로세스가 필요로 할 수도 있는 공유 메모리 블록에 대한 액세스를 필요로하지 않으면
unlink()
- 어떤 프로세스에서도 공유 메모리 블록이 더는 필요하지 않으면, 적절한 정리를 위해
2.2. parameter 소개
- name
- 문자열로 지정된 요청된 공유 메모리의 고유한 이름
- 새 공유 메모리 블록을 만들 때, 이름에 None(기본값)이 제공되면, 새로운 이름이 생성
- create
- 새 공유 메모리 블록을 만들지(True), 또는 기존 공유 메모리 블록을 연결할지(False)를 제어
- size
- 새 공유 메모리 블록을 만들 때 요청된 바이트 수를 지정
- 일부 플랫폼은, 해당 플랫폼의
메모리 페이지 크기를 기반으로 메모리 덩어리를 할당
하기 때문에, 공유 메모리 블록의 정확한 크기는 요청한 크기보다 크거나 같을 수 있음
- 기존 공유 메모리 블록에 연결할 때는, size 매개 변수가 무시
- 새로 공유 메모리 생성할 때
create=True / size=크기
- 기존 메모리에 붙일 때
from multiprocessing import Process, shared_memory, Semaphore
import numpy as np
import time
from typing import Tuple
def worker(id: int, number: int, np_array: np.ndarray,
shm: shared_memory.SharedMemory, sam: Semaphore) -> None:
"""작업자 프로세스에서 실행되는 함수.
Args:
id (int): 작업자 ID.
number (int): 반복할 숫자.
np_array (np.ndarray): 공유할 NumPy 배열.
shm (shared_memory.SharedMemory): 공유 메모리 객체.
sam (Semaphore): 세마포어 객체.
"""
b = np.ndarray(np_array.shape, dtype=np_array.dtype, buffer=shm.buf)
sam.acquire()
for i in range(number):
b[0] += 1
sam.release()
if __name__ == "__main__":
sam: Semaphore = Semaphore(1)
np_array: np.ndarray = np.array([0])
start_time: float = time.time()
shm: shared_memory.SharedMemory = shared_memory.SharedMemory(
create=True, size=np_array.nbytes)
b: np.ndarray = np.ndarray(np_array.shape,
dtype=np_array.dtype,
buffer=shm.buf)
th1: Process = Process(target=worker,
args=(1, 50000000, np_array, shm, sam))
th2: Process = Process(target=worker,
args=(2, 50000000, np_array, shm, sam))
th1.start()
th2.start()
th1.join()
th2.join()
print("--- %s seconds ---" % (time.time() - start_time))
print(f"sheard memory : {b[0]}")
shm.close()
shm.unlink()
print("total_number=", end="")
print("end of main")
import numpy as np
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
from typing import Tuple, Any
def modify_shared_memory(name: str, shape: Tuple[int, ...], dtype: Any) -> None:
"""공유 메모리에 접근하여 NumPy 배열을 수정하는 함수.
Args:
name (str): 공유 메모리의 이름.
shape (Tuple[int, ...]): NumPy 배열의 형태.
dtype (Any): NumPy 배열의 데이터 타입.
"""
existing_shm = SharedMemory(name=name)
np_array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
np_array += 1
existing_shm.close()
if __name__ == '__main__':
original_array: np.ndarray = np.arange(10, dtype=np.float64)
print("Original array:", original_array)
shm: SharedMemory = SharedMemory(create=True, size=original_array.nbytes)
shared_array: np.ndarray = np.ndarray(original_array.shape,
dtype=original_array.dtype,
buffer=shm.buf)
shared_array[:] = original_array
p: Process = Process(target=modify_shared_memory,
args=(shm.name, original_array.shape,
original_array.dtype))
p.start()
p.join()
print(shared_array)
shm.close()
shm.unlink()
5.4.5. Semaphores / Mutexes
multiprocessing
모듈을 사용하여 SharedMemory
를 이용할 때, 뮤텍스(mutexes)나 세마포어(semaphores)와 같은 동기화 메커니즘을 사용하여 프로세스 간의 데이터 접근을 동기화할 수 있음
- 이러한 동기화 기법은 데이터 경쟁(두 개 이상의 프로세스가 동시에 공유 데이터를 수정하려 할 때 발생하는 문제)을 방지하는 데 중요
5.4.5.1. 세마포어(Semaphores)
- 세마포어는 동시에 리소스에 접근할 수 있는 프로세스의 수를 제한하는 방법으로 사용
multiprocessing
모듈에서는 Semaphore
를 사용하여 이를 구현할 수 있음
5.4.5.2. 뮤텍스(Mutexes)
- 뮤텍스는 세마포어의 특별한 경우로, 동시에 한 프로세스만이 리소스에 접근할 수 있게 함
- Python에서는
Lock
을 사용하여 뮤텍스를 구현할 수 있음
5.4.5.3. 예제 코드
from multiprocessing import Process, Semaphore, shared_memory
import numpy as np
import time
def worker(shm_name, sem: Semaphore):
shm = shared_memory.SharedMemory(name=shm_name)
array = np.ndarray((10,), dtype=np.int64, buffer=shm.buf)
with sem:
for i in range(10):
array[i] += 1
print("Data updated by process:", array[:])
if __name__ == "__main__":
original_data = np.arange(10, dtype=np.int64)
shm = shared_memory.SharedMemory(create=True, size=original_data.nbytes)
array = np.ndarray((10,), dtype=np.int64, buffer=shm.buf)
array[:] = original_data
sem = Semaphore(1)
processes = [Process(target=worker, args=(shm.name, sem)) for _ in range(2)]
for p in processes:
p.start()
for p in processes:
p.join()
print("Final data in shared memory:", array[:])
shm.close()
shm.unlink()
- Python의
multiprocessing.shared_memory.SharedMemory
클래스는
- 공유 메모리 세그먼트를 생성할 때, 명시적으로 크기를 지정하지 않으면 내부적으로 정해진 기본 크기를 사용합니다.
- 이 기본 크기는 운영 체제와 Python의 구현에 따라 달라질 수 있으며, 특정 환경에서 16384 (16KB)로 설정되어 있을 수 있습니다.
- 공유 메모리 객체를 생성할 때
size
매개변수에 직렬화된 데이터의 길이를 전달하고 있지만,
- 실제로 공유 메모리 세그먼트의 최소 할당 크기는 시스템의 페이지 크기나 다른 내부 정책에 의해 결정될 수 있습니다.
- 많은 운영 체제에서는 메모리를 페이지 단위로 관리하며, 이 페이지의 크기는 보통 4KB 또는 그 이상일 수 있습니다.
- 따라서 요청된 크기가 이 페이지 크기보다 작더라도, 실제 할당되는 메모리 크기는 페이지 크기 또는 여러 페이지의 크기가 될 수 있습니다.
SharedMemory
객체가 생성될 때, 요청된 크기보다 큰 메모리 세그먼트가 할당되는 경우,
할당된 실제 크기는 size
속성을 통해 확인할 수 있습니다.
- 이 경우, 할당된 공유 메모리의 크기는 운영 체제와 Python 구현의 내부 정책에 따라 결정되며, 특정 환경에서는 이 크기가 16384로 고정될 수 있습니다.
- 이러한 행동은 특히 직렬화된 데이터의 크기가 다양하더라도 관찰될 수 있으며, 공유 메모리 사용을 위한 최소 요구 사항을 충족하기 위한 시스템의 방식입니다.
- 16384 바이트보다 큰 데이터를 저장하려는 경우,
- 예를 들어, 직렬화된 데이터의 크기가 20000 바이트라면, SharedMemory 객체의 size 매개변수로 20000을 지정하게 됩니다.
- 이 경우, 운영 체제는 최소한 20000 바이트를 저장할 수 있는 공유 메모리 세그먼트를 할당하게 됩니다.
- 실제 할당된 공유 메모리 세그먼트의 크기(existing_shm.size)는 20000 바이트 이상이 될 것이며, 이는 운영 체제의 페이지 크기와 메모리 관리 정책에 따라 결정됩니다.
- 운영 체제는 메모리를 페이지 단위로 관리하기 때문에, 요청된 크기가 페이지 크기의 정수 배가 아닐 경우,
- 실제 할당된 크기는 요청된 크기를 포함할 수 있는 가장 작은 페이지 크기의 배수가 됩니다.
- 예를 들어, 페이지 크기가 4096 바이트이고 요청된 크기가 20000 바이트라면,
- 할당된 공유 메모리의 크기는 20480 바이트(4096 * 5)가 될 수 있습니다.
- 이는 existing_shm.size를 통해 확인할 수 있습니다.