[CS/Python]Multiprocessing(2)

Jay·2023년 1월 31일
0
post-thumbnail

Process Communication

파이썬의 multiprocessing 모듈에서는 프로세스 간의 통신 채널로 QueuePipe를 지원합니다.

Queue

Queue의 경우 queue모듈의 Queue 클래스의 클론에 가깝습니다. 여러개의 프로세스가 하나의 Queue를 바라보고, 데이터를 삽입/추출하는 방식을 통해 데이터를 전달합니다.

import time
import multiprocessing

def set_data(q):
    p = multiprocessing.current_process()
    msg = "Hello World"
    q.put(msg)													# queue에 데이터 삽입
    print ("[%s] set queue data : %s" % (p.name, msg))

def get_data(q):
    time.sleep(1)
    p = multiprocessing.current_process()
    print ("[%s] get queue data : %s" % (p.name, q.get()))		# queue에서 데이터 추출

def main():
    queue = multiprocessing.Queue()

    p1 = multiprocessing.Process(name="set_data", target=set_data, args=(queue,))
    p1.start()

    p2 = multiprocessing.Process(name="get_data", target=get_data, args=(queue,))
    p2.start()

    p1.join()
    p2.join()

if __name__ == "__main__":
    main()

프로세스 간에 데이터를 전달하기 위한 queue를 생성하여 set_data, get_data 두 프로세스에게 인자로 전달하였습니다. set_data 프로세스에서는 데이터를 생성하여 queue에 삽입하고, get_data 프로세스에서는 데이터를 추출하여 출력합니다. 이처럼 프로세스 간 공유되는 queue를 만들고 데이터를 queue에 삽입/삭제 하는 방식을 통해 전달할 수 있습니다. 이와 같은 multiprocessing 모듈의 Queue는 스레드와 프로세스 간의 데이터 무결성을 보장하기 때문에 무결성에 대한 조치 없이도 사용할 수 있습니다.

[set_data] set queue data : Hello World
[get_data] get queue data : Hello World

Queue API

  • get(block=True, timeout=None)
    큐에 저장된 객체를 반환하며 삭제합니다. 인자 blockTrue이면 timeout 만큼 데이터가 들어올 때까지 블록하게 됩니다. timeout이 양수인 경우, blockTrue이면 timeout만큼 블록하고 해당 시간동안 객체가 들어오지 않으면 queue.Empty 예외를 발생시킵니다.
  • put(obj, block=True, timeout=None) :
    objqueue에 넣습니다. blockTrue이고 timeoutNone이면 빈 슬롯이 생길때까지 블록하게 됩니다. timeout이 양수인 경우 timeout초만큼 블록하고 그 동안 빈 슬롯이 생기지 않으면 queue.Full 예최를 발생시킵니다.

Pipe

한번에 여러 프로세스와 정보를 주고 받을 수 있는 Queue와 달리 Pipe는 1:1 통신 방식을 지원합니다. 따라서 Pipe를 사용하게 되면 마치 client-server 처럼 2개의 pipe 객체를 반환받고 이를 통해서 메세지를 주고 받게 됩니다.

import multiprocessing

def child(pipe):
    p = multiprocessing.current_process()
    msg = "Hello World"
    pipe.send(msg)												# 데이터 전송
    print ("[%s] Send a message to pipe : %s" % (p.name, msg))

def main():
    parent_pipe, child_pipe = multiprocessing.Pipe()			# 2개의 pipe 객체 반환
    p = multiprocessing.Process(name="child", target=child, args=(child_pipe,))
    p.start()
    print ("Recieved message : %s" % parent_pipe.recv())		# 데이터 수신
    p.join()

if __name__ == "__main__":
    main()

Pipe 생성자는 Pipe의 양 끝에 해당하는 2개의 객체를 반환합니다. 반환된 2개의 객체를 서로 다른 프로세스에게 전달하고, 두 프로세스는 해당 pipe객체를 통해 데이터를 주고받게 됩니다.

[child] Send a message to pipe : Hello World
Recieved message : Hello World

Pipe API

class multiprocessing.connection.Connection

  • send(obj) : 연결의 반대편 끝에서 recv()를 사용하여 읽을 객체를 보냅니다. 단, 전송하는 객체는 피클이 가능해야 합니다.
  • recv() : 연결의 반대편 끝에서 send(obj)를 통해 보낸 객체를 반환합니다. 객체를 수신할 때까지 블록하며, 수신할 내용이 없는 상태로 반대편 끝이 닫히게 된다면 EOFError를 발생시킵니다.
  • close() : 연결을 닫습니다.


Process 메모리 공유

스레드나 프로세스와 같이 동시성을 사용하는 로직을 설계할 때는 메모리를 공유하지 않는 것이 가장 좋습니다. 동시에 하나의 메모리에 접근하여 어떤 동작을 하게된다면 메모리의 무결성을 보장하기 위한 로직을 구현해야하기 때문입니다.

하지만 메모리를 필수적으로 공유해야하는 상황이 발생할 수 있습니다. 이러한 경우를 대비하여 multiprocessing 모듈은 프로세스가 메모리를 공유할 수 있도록 Server process를 사용하는 방법과 실제로 프로세스간에 메모리를 공유할 수 있도록 하는 방법을 제공하고 있습니다.


프로세스 메모리 공유

여러 프로세스에게 공유되는 메모리 공간을 shared memory라고 부릅니다. multiprocessing 모듈에서는 공유된 메모리 공간에 저장할 수 있는 ValueArray라는 API를 제공합니다.

Value, Array API

import multiprocessing

def worker(num, num_list):
    p = multiprocessing.current_process()
    print ("[%s] num : %s" % (p.name, num.value))
    for idx, value in enumerate(num_list):
        print ("[%s] num list[%s] : %s" % (p.name, idx, value))

    num.value = 50												# value 수정
    for i in range(len(num_list)):								# array 수정
        num_list[i] = num_list[i] * 10
	# num_list = [x*10 for x in num_list]						# list comprehension 사용 불가

def main():
    single_integer = multiprocessing.Value("i", 5)				# Value 생성(type, 초기화 값)
    integer_list = multiprocessing.Array("i", range(10))		# Array 생성(type, 초기화 값)

    p = multiprocessing.Process(name="worker", target=worker, args=(single_integer, integer_list))
    p.start()

    p.join()
    print ("num : %s" % (single_integer.value))
    for idx, value in enumerate(integer_list):
        print ("num list[%s] : %s" % (idx, value))

if __name__ == "__main__":
    main()

main 프로세스에서는 ValueArray 객체를 생성시키고 worker 프로세스에 전달하였습니다. worker 프로세스에서는 전달받은 ValueArray 객체의 값을 출력한 후, 값을 수정한 다음 다시 출력하는 작업을 수행합니다.

ValueArray API는 내부적으로 프로세스와 스레드 간의 무결성이 보장됩니다. 따라서 해당 객체들을 사용하는 프로세스에서 데이터를 조회, 수정 등 다양한 작업을 수행하더라도 별도의 무결성을 위한 로직을 구현하지 않고 프로세스끼리 공유하는 메모리 공간을 사용할 수 있습니다.

주의사항

데이터를 읽기와 쓰기를 동시에 수행하는 복합대입연산자의 경우 무결성이 깨지거나, List comprehension의 경우 의도한대로 동작하지 않을 수 있습니다. 복합대입연산자의 경우 원자적 연산이 아닙니다. 따라서 공유되는 메모리에 존재하는 데이터를 원자적으로 증가시키려면 아래와 같은 방법으로 구현해야 합니다.

num.value += 10 		# 원자적 방법 x
with num.get_lock():	# lock을 획득하여 원자적으로 수행하도록한다.
	num.value += 10	

만약 위의 방법처럼 공유 메모리의 자원을 수정하게 되는 경우, 데이터의 읽기, 쓰기 동작 사이에 다른 프로세스에서 연산을 수행하게 되어 데이터의 값이 더렵혀져 무결성이 깨지게 되는 문제가 발생할 수 있습니다.
List comprehension의 경우에도 마찬가지입니다. list comprehension의 경우 loop을 돌며 lock을 획득하지만 값을 초기화하기 위한 lock은 획득하지 못하여 값이 변경되지 않게 됩니다. list comprehension을 사용하여도 오류가 발생하지는 않지만 원하는 결과대로 동작하지 않게 되므로, shared memory를 통한 데이터 공유시에는 주의해서 사용하여야 합니다.


Server Process

서버 프로세스는 별도의 메모리 공간과 데이터를 가지고 있는 server process가 존재하고, 이 server process에 요청을 통해 데이터를 받아오고, 수정해서 다시 서버로 요청을 보내는 방식으로 메모리를 공유하는 방식입니다. 내부적으로 자원을 관리하는 프로세스와, 해당 프로세스의 메모리 공간에 저장된 데이터를 공유하며 자원을 관리하는 방법입니다. 공유되는 프로세스에 접근해서 데이터에 관한 동작을 할 수 있도록 하는 것이 manager API입니다.

manager API

import multiprocessing

def print_array_or_list(name, values):
    for idx, value in enumerate(values):
        print ("[%s] num list[%s] : %s" % (name, idx, value))

def worker(v, a, l, d):
    p = multiprocessing.current_process()
    print ("[%s] value : %s, dict : %s" % (p.name, v, d["key"]))
    print_array_or_list(p.name, a)
    print_array_or_list(p.name, l)

    v.value = 50
    for i in range(len(a)):
        a[i] = a[i] * 10

    for i in range(len(l)):
        l[i] = l[i] * 10

    d["key"] = "Python3"

def main():
    manager = multiprocessing.Manager()					# SyncManager 객체 반환

    v = manager.Value("i", 5)							
    a = manager.Array("i", range(10))
    l = manager.list(range(10))
    d = manager.dict()
    d["key"] = "Python2"

    p = multiprocessing.Process(name="worker", target=worker, args=(v, a, l, d))
    p.start()

    p.join()
    main_name = "main"
    print ("[%s] value : %s, dict : %s" % (main_name, v, d["key"]))
    print_array_or_list(main_name, a)
    print_array_or_list(main_name, l)

if __name__ == "__main__":
    main()

main프로세스에서 SyncManager 객체를 생성하고, 해당 manager 사용하여 공유되는 Value, Array, list, dict 객체를 생성하고 해당 proxy를 변수에 할당하였습니다. 이와 같은 proxy들은 shared memory에 존재하는 객체처럼 무결성이 보장되며, 여러 프로세스에게 공유될 수 있습니다.

Server process는 공유되는 객체로 Event, Locl, Namespace, Queue, RLock, Semaphore, Array, Value, dict, list 를 사용할 수 있습니다. 따라서 manager를 통해 프로세스가 사용할 수 있는 대부분의 기능을 사용할 수 있습니다.



Process Pool

프로세스에서는 pool을 만들어 작업을 분리하여 처리할 수 있습니다. 작업과 작업에 필요한 데이터를 pool에 등록하고, 사용할 프로세스의 개수를 입력하면 작업과 작업에 필요한 데이터를 나누어서 처리하게 됩니다.

import multiprocessing
import time

def print_initial_msg():						# initializer : Process가 생성되었을 시점에 실행
    print ("Start process : %s" % multiprocessing.current_process().name)

def worker(data):
    time.sleep(1)
    print("[%s] data : %s"%(multiprocessing.current_process().name, data))
    return data * 2

def main():
	# 프로세스의 개수 4개로 pool 생성
    pool = multiprocessing.Pool(processes=4, initializer=print_initial_msg)

    data_list = range(10)
    result = pool.map(worker, data_list)		# worker 함수와 data_list를 mapping

    pool.close()
    pool.join()

    print ("Result : %s" % result)

if __name__ == "__main__":
    main()

main프로세스에서 프로세스의 개수가 4개인 pool을 생성하였습니다. 프로세스가 생성되는 시점에 실행되는 initializer로는 프로세스의 정보를 출력하는 print_initial_msg() 함수로 설정하였고, map()을 통해 worker함수와 data_list를 매핑하여 실행하였습니다.

Start process : SpawnPoolWorker-1
Start process : SpawnPoolWorker-2
Start process : SpawnPoolWorker-3
Start process : SpawnPoolWorker-4
[SpawnPoolWorker-1] data : 0
[SpawnPoolWorker-2] data : 1
[SpawnPoolWorker-3] data : 2
[SpawnPoolWorker-4] data : 3
[SpawnPoolWorker-1] data : 4
[SpawnPoolWorker-2] data : 5
[SpawnPoolWorker-3] data : 6
[SpawnPoolWorker-4] data : 7
[SpawnPoolWorker-1] data : 8
[SpawnPoolWorker-2] data : 9
Result : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

총 4개의 프로세스가 생성되었고, 각 프로세스에서는 데이터들을 병렬적으로 처리하였습니다. 이와 같이 pool은 단순 반복 작업들을 프로세스를 활용하여 쉽고 빠르게 처리할 수 있도록 도와주어 빅데이터 처리나 연산을 빠르고 유용하게 도와줍니다.




reference

https://docs.python.org/ko/3/library/multiprocessing.html

0개의 댓글