[multiprocessing] Manager

About_work·2024년 3월 12일
0

process, thread

목록 보기
20/23

개요

  • 언제? 왜? 쓰는가?
    • 중앙 집중식: 공유 객체의 단일 인스턴스는, 별도의 서버 프로세스에서 유지
    • 프로세스 안전: 프록시 개체는 경쟁 조건을 피하기 위해, 중앙 집중화된 개체에 대한 액세스가 프로세스 안전한지 확인
      • 프록시 객체:
        • 프로세스 간에 공유되는 객체에 대한 직접적인 접근은 허용되지 않습니다.
        • 대신, Manager는 공유 객체의 프락시(proxy) 를 반환
        • 프락시는 원래 객체를 대리하는 객체로, 원래 객체에 대한 작업을 요청할 수 있게 해 줌
    • 선택 가능성:
      • 프록시 개체를 피클링하여, 프로세스 풀의 인수대기열의 항목과 같은 하위 프로세스와 공유할 수 있음
      • 프로세스 풀: 여러 자식 프로세스들을 생성하고 관리하는 역할
    • 관리자는
      • 한 시스템의 프로세스 간에 공유 객체에 대한 안전한 액세스를 제공하는 것 외에도
      • 네트워크 액세스를 통해 다른 시스템의 프로세스 간에 동일한 객체에 안전하게 액세스할 수 있습니다.
  • 프로세스 간에 객체를 공유하는 데 사용할 수 있는 시작된 SyncManager 객체를 반환
import multiprocessing

manager = multiprocessing.Manager()
final_list = manager.list()

input_list_one = ['one', 'two', 'three', 'four', 'five']
input_list_two = ['six', 'seven', 'eight', 'nine', 'ten']

def worker(data):
    for item in data:
        final_list.append(item)

process1 = multiprocessing.Process(target=worker, args=[input_list_one])
process2 = multiprocessing.Process(target=worker, args=[input_list_two])

process1.start()
process2.start()

process1.join()
process2.join()

print(final_list)

주요 메서드 및 어트리뷰트

  • list(): 프로세스 간에 공유할 수 있는 리스트를 반환합니다.
  • dict(): 프로세스 간에 공유할 수 있는 딕셔너리를 반환합니다.

  • Array(typecode, sequence): multiprocessing.Array와 유사하게, 프로세스 간에 공유할 수 있는 배열을 생성합니다.
    multiprocessing 모듈의 Manager 클래스는 프로세스 간에 데이터를 공유하기 위한 다양한 방법을 제공합니다. 여기에는 Namespace, Value, Array 등이 포함되며, 각각의 사용 방법에 대해 구체적인 예시를 들어 설명하겠습니다.

Namespace()

  • Namespace는 프로세스 간에 공유할 수 있는 간단한 객체로, 속성 접근 방식을 통해 데이터를 저장하고 검색할 수 있습니다.
    Namespace는 동적 속성 할당을 지원하므로, 런타임에 속성을 추가하거나 변경할 수 있습니다.
from multiprocessing import Process, Manager

def worker(ns):
    ns.x += 1
    ns.y *= 2

if __name__ == '__main__':
    with Manager() as manager:
        ns = manager.Namespace()
        ns.x = 1
        ns.y = 10
        
        p = Process(target=worker, args=(ns,))
        p.start()
        p.join()
        
        print(ns.x, ns.y)
        # 출력: 2 20

Value(typecode, value)

  • Value는 프로세스 간에 공유할 수 있는 단일 값을 저장하기 위해 사용됩니다.
  • typecode는 저장할 데이터의 타입을 나타내며, value는 초기값입니다.
  • Value는 기본적인 데이터 타입(예: 정수, 실수)을 공유하기 위한 것
from multiprocessing import Process, Manager

def worker(val):
    val.value += 5.5

if __name__ == '__main__':
    with Manager() as manager:
        num = manager.Value('d', 0.0)  # 'd'는 double을 의미
        
        p = Process(target=worker, args=(num,))
        p.start()
        p.join()
        
        print(num.value)
        # 출력: 5.5

Array(typecode, sequence)

  • Array는 프로세스 간에 공유할 수 있는 배열을 저장하기 위해 사용
  • typecode는 배열에 저장될 데이터의 타입을 지정하고, sequence는 배열의 초기값을 제공합니다.
  • 배열은 고정 길이를 가지며, 각 요소는 동일한 타입입니다.
from multiprocessing import Process, Manager

def worker(arr):
    for i in range(len(arr)):
        arr[i] = arr[i] ** 2

if __name__ == '__main__':
    with Manager() as manager:
        arr = manager.Array('i', range(5))  # 'i'는 정수를 의미
        
        p = Process(target=worker, args=(arr,))
        p.start()
        p.join()
        
        print(list(arr))
        # 출력: [0, 1, 4, 9, 16]

  • Lock(): 프로세스 간의 동기화를 위한 락 객체를 반환합니다.
  • RLock(): 재귀적 락을 제공합니다. 같은 프로세스 내에서 여러 번 획득할 수 있습니다.
  • Semaphore(value): 세마포어를 생성합니다. 동시에 리소스에 접근할 수 있는 프로세스의 수를 제한합니다.
  • Python의 multiprocessing.Manager를 사용하면 여러 프로세스 간에 데이터를 안전하게 공유할 수 있습니다.
  • Manager가 제공하는 프록시 객체(예: 공유 딕셔너리, 리스트 등)는 자체적으로 프로세스 안전(process-safe)하며, 내부적으로 동기화 메커니즘을 사용하여 경쟁 조건(race conditions)을 방지
    • 즉, 동시에 여러 프로세스에서 해당 객체에 접근해도 데이터의 일관성과 안정성이 유지
  • 그럼에도 불구하고, Lock이나 Semaphore 같은 동기화 메커니즘이 필요한 이유는 다음과 같습니다:
  1. 복합 작업의 원자성 보장:
  • Manager를 통해 공유되는 객체에 대한 단일 연산(예: 한 요소의 증가)은 프로세스 안전할 수 있습니다.
  • 그러나 여러 단계에 걸친 복합 작업(예: 여러 단계의 검사 및 업데이트)의 경우, 그 사이에 다른 프로세스가 개입할 수 있어 일관성이 깨질 수 있습니다.
  • 이런 경우, Lock을 사용하여 복합 작업 전체를 원자적으로 처리할 수 있습니다.
  1. 특정 섹션의 독점 접근 제어:
  • 어떤 자원이나 코드 섹션에 대한 독점적인 접근을 통제해야 할 때 Lock이나 Semaphore를 사용할 수 있습니다.
  • 예를 들어, 한 번에 하나의 프로세스만 특정 자원을 사용하게 하려면 Lock을, 제한된 수의 프로세스만 동시에 접근하게 하려면 Semaphore를 사용합니다.
    1. 비공유 자원의 동기화:
  • 파일 시스템에 대한 접근이나 외부 시스템과의 통신 등 Manager를 통해 공유되지 않는 자원에 대한 접근을 동기화해야 할 때도 있습니다.
  • 이런 경우에도 Lock이나 Semaphore 등을 사용하여 동시 접근을 조절할 수 있습니다.
  • 즉, Manager가 프로세스 안전한 공유 객체를 제공하더라도, 모든 유형의 동기화 문제를 자동으로 해결하는 것은 아닙니다.
  • 따라서 복잡한 동기화 요구 사항이 있는 애플리케이션에서는 추가적인 동기화 메커니즘을 적절히 사용해야 합니다.
from multiprocessing import Process, Manager, Lock, Semaphore
from typing import Dict

def task_with_lock(lock: Lock, data: Dict[str, int]) -> None:
    with lock:
        value = data['counter']
        time.sleep(0.1)  # 시뮬레이션을 위한 지연
        data['counter'] = value + 1

def task_with_semaphore(semaphore: Semaphore, data: Dict[int, str], process_number: int) -> None:
    with semaphore:
        data[process_number] = f"Process {process_number} was here"
        time.sleep(0.5)  # 시뮬레이션을 위한 지연

# 예시 코드 실행 부분은 동일하며, 함수 호출 시 타입 어노테이션이 적용된 파라미터를 전달합니다.

  • Event(): 이벤트 객체를 반환합니다. 프로세스 간의 이벤트 통지를 위해 사용됩니다.
  • Condition(): 조건 변수 객체를 반환합니다. 프로세스 간에 복잡한 동기화 패턴을 구현할 때 사용됩니다.
  • Queue(maxsize): 프로세스 간에 데이터를 주고받을 수 있는 큐를 생성합니다. maxsize는 큐의 최대 아이템 수를 지정합니다.
  • Barrier(parties, action=None, timeout=None): 모든 프로세스가 바리어 지점에 도달할 때까지 기다리게 하는 동기화 객체입니다.

사용 예시

from multiprocessing import Process, Manager

def worker(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        # 공유 딕셔너리와 리스트 생성
        d = manager.dict()
        l = manager.list(range(10))
        
        # 작업자 프로세스 생성 및 실행
        p = Process(target=worker, args=(d, l))
        p.start()
        p.join()
        
        print(d)
        print(l)

이 예시에서는 Manager()를 사용하여 프로세스 간에 공유되는 딕셔너리와 리스트를 생성합니다. 그런 다음 Process를 사용하여 별도의 프로세스에서 이 데이터 구조들을 수정하는 작업을 수행합니다. Manager를 사용하면 서로 다른 프로세스에서 실행되는 코드 간에 복잡한 데이터 구조를 쉽게 공유하고 동기화할 수 있습니다.

profile
새로운 것이 들어오면 이미 있는 것과 충돌을 시도하라.

0개의 댓글