1. 언제 써야해? 장점?
- 한줄 요약: 10개의 주방 / 10개의 요리사 / 10개 해야할 요리
- parallel 을 확보하기 위함 ( CPU를 많이 쓰고, I/O waiting 이 적은 테스크에 적합)
- pool을 쓸 수 있습니다.
- 공유 메모리를 사용하는 더 고급스러운 기능을 제공합니다.(다른 패키지와 비교했을 때)
- 하지만, 이러한 고급 기능을 사용하는 것은 매우 복잡하다.
- 이 고급스러운 기능 사용은 다른 모든 패키지들을 사용해보고도 안되면, 제일 마지막 단계에 시도하라.
2. 단점?
- 해당 모듈로 짠 process를 thread로 바꾸기 위해서는 힘든 리펙토링이 들어갑니다.
3. 메서드와 어트리뷰트
import os
from multiprocessing import Process
num = 42
def f(name):
global num
num += 1
print('pid of parent:', os.getppid())
print('pid of %s : %d' %(name, os.getpid()))
print('%d' %num)
if __name__ == '__main__':
print('pid of main:', os.getpid())
p1 = Process(target=f, args=("proc_1",))
p2 = Process(target=f, args=("proc_2",))
p1.start(); p1.join()
p2.start(); p2.join()
- p = mp.Process(target=None, name=None, args=(), kwargs={}, *, daemon=None)
- daemon 인자는 프로세스 daemon 플래그를 True 또는 False 로 설정합니다. None (기본값) 이면, 이 플래그는 만드는 프로세스로부터 상속됩니다.
- p.run()
- 프로세스의 활동을 나타내는 메서드.
- 서브 클래스에서 이 메서드를 재정의할 수 있습니다.
- 표준 run() 메서드는 객체의 생성자에 target 인자로 전달된 콜러블 객체를 호출하는데 (있다면) args 와 kwargs 인자를 각각 위치 인자와 키워드 인자로 사용합니다.
- p.start()
- 프로세스의 활동을 시작합니다.
- 이것은 프로세스 객체 당 최대 한 번 호출되어야 합니다.
- 객체의 run() 메서드가 별도의 프로세스에서 호출되도록 합니다.
- p.join([timeout])
프로세스의 실행이 완료될 때까지 기다립니다.
- 프로세스가 시작된 후 (start() 메소드 호출 이후) 실행을 마칠 때까지 기다리는 데 사용됩니다.
- 선택적 인자 timeout 이 None (기본값) 인 경우, 메서드는 join() 메서드가 호출된 프로세스가 종료될 때까지 블록 됩니다.
- timeout 이 양수면 최대 timeout 초 동안 블록 됩니다.
- 이 메서드는 프로세스가 종료되거나 메서드가 시간 초과 되면 None 을 돌려줌에 주의해야 합니다.
- 프로세스의 exitcode 를 검사하여 종료되었는지 확인하십시오.
- 프로세스는 여러 번 조인할 수 있습니다.
- 교착 상태를 유발할 수 있으므로 프로세스는 자신을 조인할 수 없습니다.
- 프로세스가 시작되기 전에 프로세스에 조인하려고 하면 에러가 발생합니다.
- p.terminate()
- 해당 프로세스는 현재 실행 중인 작업을 완료하지 않고 강제로 종료
- 이는 데드락(deadlock) 상태에 빠진 프로세스를 종료시키거나,
- 더 이상 필요하지 않은 작업을 실행 중인 프로세스를 종료시킬 때 유용
- 프로세스가 정리 코드를 실행하지 못하도록 하므로, 열린 파일이나 네트워크 연결과 같은 리소스가 제대로 정리되지 않을 수 있습니다.
- 가능한 한 이 메소드의 사용을 피하고, 프로세스가 자체적으로 종료할 수 있는 방법을 마련하는 것이 좋음
- p.close()
- 호출하면 더 이상 해당 프로세스 객체를 사용하여 새 작업을 시작할 수 없습니다.
close()
메서드를 실행하면 아래 "리소스" 가 정리됩니다.
- 프로세스 핸들과 관리 리소스:
- 프로세스를 생성하고 관리할 때, 운영 체제는 이를 위한 리소스를 할당
프로세스의 식별자
, 상태 정보
, 그리고 프로세스 제어를 위한 핸들
등이 포함
multiprocessing
내부 리소스:
- 기존:프로세스 간 통신(IPC)을 위해 파이프, 큐, 공유 메모리 등의 메커니즘을 제공
- 이러한 메커니즘을 구현하기 위해 모듈 내부적으로 다양한 리소스가 사용됨
close()
메서드는 이러한 내부 리소스의 정리에 도움을 줍니다.
- 그러나, 프로세스가 직접 열었던 파일이나 네트워크 연결과 같은 리소스는
close()
메서드의 책임 범위 밖
- 이러한 리소스는 프로세스 내에서 명시적으로 관리되어야 하며, 프로세스의 코드 내에서 적절히 열고 닫아야 합니다.
- 예를 들어, 파일을 열어 작업한 후에는 해당 파일을 명시적으로 닫거나, 네트워크 연결을 종료하는 등의 작업이 필요합니다.
close()
메서드는 프로세스를 종료시키지 않으며, 프로세스가 실행 중인 상태에서는 해당 프로세스의 실행을 중단시키지 않음
close()
의 주된 목적은 프로세스 객체와 관련된 시스템 리소스를 해제하는 것이지, 실행 중인 프로세스의 작업을 중단시키는 것이 아닙니다.
- 따라서 프로세스 내부에서 무한 루프(
while True:
등)가 돌고 있을 경우, close()
를 호출한다고 해서 해당 루프가 중단되거나 프로세스가 종료되지 않습니다.
- 실행 중인 프로세스를 종료하려면,
terminate()
메서드를 사용해야 합니다.
- proc = mp.current_process()
- 현재 실행되는 프로세스에 대한 정보를 담고 있는 객체를 얻을 수 있다.
- 현재 프로세스에 해당하는 Process 객체를 반환합니다.
- mp.parent_process() 도 있다.
- proc.name / proc.pid
- pid는 OS가 각 프로세스에게 부여한 고유 번호로써, 프로세스의 우선 순위를 조정하거나 종료하는 등 다양한 용도로 사용됩니다.
- mp.set_start_method('__')
- 'spawn'
- 상위 프로세스는 새로운 파이썬 인터프리터 프로세스를 시작한다.
- 하위 프로세스는 프로세스 개체의 run() 메서드를 실행하는 데 필요한 리소스만 상속합니다.
- 특히 상위 프로세스의 불필요한 파일 설명자 및 핸들은 상속되지 않습니다.
- 이 방법을 사용하여 프로세스를 시작하는 것은 포크 또는 포크 서버를 사용하는 것에 비해 다소 느립니다.
- 유닉스 및 윈도우에서 사용 가능합니다. 윈도우와 macOS의 기본값.
- 부모 프로세스가 OS에 요청하여 Child process를 새로 만들어내는 것을 spawning 이라고 부릅니다.
- fork
- 부모 프로세스는 os.fork() 를 사용하여 파이썬 인터프리터를 포크 합니다.
- 자식 프로세스는, 시작될 때, 부모 프로세스와 실질적으로 같습니다.
- 부모의 모든 자원이 자식 프로세스에 의해 상속됩니다.
- 다중 스레드 프로세스를 안전하게 포크 하기 어렵다는 점에 주의하십시오.
- 유닉스에서만 사용 가능합니다. 유닉스의 기본값.
- frokserver
- 프로그램이 시작되고 forkserver 시작 방법을 선택하면, 서버 프로세스가 시작됩니다.
- 그 이후부터, 새로운 프로세스가 필요할 때마다, 부모 프로세스는 서버에 연결하여 새로운 프로세스를 포크 하도록 요청합니다.
- 포크 서버 프로세스는 단일 스레드이므로 os.fork() 를 사용하는 것이 안전합니다.
- 불필요한 자원은 상속되지 않습니다.
- 유닉스 파이프를 통해 파일 기술자를 전달할 수 있는 유닉스 플랫폼에서 사용할 수 있습니다.
import time
from multiprocessing import Pool
def count(process_name):
for i in range(1, 100001):
print(process_name, i)
if __name__=="__main__":
start_time = time.time()
p_list = ["proc_1", "proc_2", "proc_3", "proc_4"]
pool = Pool(processes = 4)
pool.map(count, p_list)
pool.close()
pool.join()
print(time.time() - start_time)
- from multiprocessing import pool
- pool = Pool(processes=4)
- pool 의 장점
- 요청이 올 때마다 생성하는 것보다, 생성 시간 비용을 줄일 수 있다.
- process의 총량을 관리할 수 있습니다.
- task의 생성과, task의 실행을 분리할 수 있습니다.
- 작업을 제출할 수 있는 작업자 프로세스 풀을 제어하는 프로세스 풀 객체.
- 제한 시간과 콜백을 사용하는 비동기 결과를 지원하고 병렬 map 구현을 제공합니다.
- processes 는 사용할 작업자 프로세스 수입니다.
- processes 가 None 이면 os.cpu_count() 에 의해 반환되는 수가 사용됩니다.
- with문을 pool과 함꼐 사용하면, enter 엔 Pool이 생성되고, exit 엔 terminate()가 호출됩니다.
- initializer 가 None 이 아니면, 각 작업자 프로세스는 시작할 때 initializer(*initargs) 를 호출합니다.
- pool.apply_async(func[, args[, kwds[, callback[, error_callback]]]])
- 인자 args 및 키워드 인자 kwds 를 사용하여 func 를 호출합니다.
- get()을 호출하여 결과물을 받기 위해선, 그 작업이 끝날 때까지 기다려야 한다.
- AsyncResult 를 반환합니다.
from multiprocessing import Pool
import multiprocessing as mp
import time
import os
def func(num):
c_proc = mp.current_process()
print("Running on Process",c_proc.name,"PID",c_proc.pid)
time.sleep(1)
print("Ended",num,"Process",c_proc.name)
return num
if __name__ == '__main__':
p = Pool(4)
start = time.time()
ret1 = p.apply_async(func,(1,))
ret2 = p.apply_async(func,(2,))
ret3 = p.apply_async(func,(3,))
ret4 = p.apply_async(func,(4,))
ret5 = p.apply_async(func,(5,))
print(ret1.get(),ret2.get(),ret3.get(),ret4.get(),ret5.get())
delta_t = time.time()-start
print("Time :",delta_t)
p.close()
p.join()
-
-
pool.map_async(unc, iterable[, chunksize[, callback[, error_callback]]])
- map_async()도 apply_async()와 동일하게 AsyncResult를 반환받는다.
- map은 작업이 끝나기 이전에 메인 프로세스의 다음 줄의 코드들을 실행할 수 없지만, map_async()는 AsyncResult의 get()을 호출하기 이전까지는 작업이 완전히 끝나지 않아도 메인프로세스의 다음 코드들을 실행할 수 있다.
- 하지만 하나의 iterable 인자만 지원합니다, 여러 이터러블에 대해서는 starmap_async()을 참조하십시오.
-
pool.AsyncResult
- Pool.apply_async()와 Pool.map_async() 에 의해 반환되는 결과의 클래스.
- get([timeout])
- 결과가 도착할 때 반환합니다.
- timeout 이 None 이 아니고 결과가 timeout 초 내에 도착하지 않으면 multiprocessing.TimeoutError 가 발생합니다.
- wait([timeout])
- 결과가 사용 가능할 때까지 또는 timeout 초가 지날 때까지 기다립니다.
- ready()
- successful()
- 예외를 발생시키지 않고 호출이 완료되었는지를 돌려줍니다.
- 결과가 준비되지 않았으면 ValueError 를 발생시킵니다.
from multiprocessing import Pool
import multiprocessing as mp
import time
import os
def func(num):
c_proc = mp.current_process()
print("Running on Process",c_proc.name,"PID",c_proc.pid)
time.sleep(1)
print("Ended",num,"Process",c_proc.name)
return num
if __name__ == '__main__':
p = Pool(4)
start = time.time()
ret = p.map_async(func,[1,2,3,4,5])
print("is 'ret' ready? :",ret.ready())
print(ret.get())
delta_t = time.time() - start
print("Time :",delta_t)
p.close()
p.join()
- callback은 어떻게 써? (apply_async, map_async에서)
from multiprocessing import Pool
import multiprocessing as mp
import time
import os
def callback_func(result):
print("callback_func got result :",result)
def square(x):
return x*x
if __name__ == '__main__':
with Pool(4) as p:
result = p.map_async(square,range(11),callback=callback_func)
result.wait()
callback_func got result : [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
- pool.close()
- 더는 작업이 풀에 제출되지 않도록 합니다.
- 모든 작업이 완료되면 작업자 프로세스가 종료됩니다.
- pool.terminate()
- 계류 중인 작업을 완료하지 않고 즉시 작업자 프로세스를 중지합니다.
- pool.join()
- 작업자 프로세스가 종료될 때까지 기다립니다.
- join() 호출 전에 반드시 close() 나 terminate()를 호출해야합니다 .
4. multiprocessing.pool VS concurrent.futures.ProcessPoolExecutor
4.1. multiprocessing.pool
- 후자와 다르게, task를 취소할 능력이 없다.
- 후자와 다르게, 다른 종류의 tasks 조합과 함께 일할 능력이 없다.
- 후자와 다르게, 모든 tasks를 강제로 종료할 능력을 가지고 있다.
- 후자와 다르게, task에서 발생할 수 있는 예외에 접근할 수 있는 능력이 없다.
4.2. concurrent.futures.ProcessPoolExecutor
- 전자와 다르게, task를 취소할 능력이 있다.
- 전자와 다르게, 다른 종류의 tasks 조합과 함께 일할 능력이 있다.
- 전자와 다르게, 모든 tasks를 강제로 종료할 능력이 없다.
- 전자와 다르게, task에서 발생할 수 있는 예외에 접근할 능력이 있다.
5. Inter Process Communication(IPC)
5.1. multiprocessing.Manager()
5.2. multiprocessing.Queue([maxsize])
-
언제쓰는가?
- native python objects를 쓸 때 가장 흔하고 단순하게 쓴다.
- 왜냐하면 전송 전에 패키지화 되고, 전송 후에는 unpackage화 되기 때문이다.
- 여러 생산자와 소비자를 쓰고 싶으면(two points 이상으로 의사소통 하고 싶으면) 써라.
-
파이프와 몇 개의 록/세마포어를 사용하여 구현된 프로세스 공유 큐를 반환합니다.
-
프로세스가 처음으로 항목을 큐에 넣으면 버퍼에서 파이프로 객체를 전송하는 피더 스레드가 시작됩니다.
-
제한 시간 초과를 알리기 위해 표준 라이브러리의 queue 모듈에서 정의되는 queue.Empty 와 queue.Full 예외를 일으킵니다.
-
-
Queue.put()
-
Queue.get()
from multiprocessing import Process, Queue
sentinel = -1
def creator(data, q):
// Creates data to be consumed and waits for the consumer
// to finish processing
print('Creating data and putting it on the queue')
for item in data:
q.put(item)
def my_consumer(q):
// Consumes some data and works on it
// In this case, all it does is double the input
while True:
data = q.get()
print('data found to be processed: {}'.format(data))
processed = data * 2
print(processed)
if data is sentinel:
break
if __name__ == '__main__':
q = Queue()
data = [5, 10, 13, -1]
process_one = Process(target=creator, args=(data, q))
process_two = Process(target=my_consumer, args=(q,))
process_one.start()
process_two.start()
q.close()
q.join_thread()
process_one.join()
process_two.join()
- Queue.cancel_join_thread()
- 기본적으로, multiprocessing.Queue 객체를 만들 때는 join_thread 파라미터가 True로 설정
- 이는 큐 객체가 소멸될 때 큐를 처리하는 스레드가 종료될 때까지 기다리는 것을 의미
- cancel_join_thread() 메서드를 사용하면 프로그램 종료시 더 빠른 종료가 가능해지며, 대신에 큐가 처리되지 않을 수 있다는 점을 유의해야 합니다.
5.3. multiprocessing.Pipe([duplex])
- 언제 쓰는가?
- python과 다른 언어 간에 공유하는 경우 protocol buffer + pipe를 사용하거나, Redis를 사용할 수 있다.
- 2 endpoints 로 충분하면 써라. 왜냐면 pipe가 queue 보다 훨씬 빠르기 때문이다. (Queue는 Pipe를 기반으로 만들어졌다.)
- 파이프의 끝을 나타내는 Connection 객체 쌍 (conn1, conn2) 를 반환합니다.
- duplex 가 True (기본값) 면 파이프는 양방향입니다.
- duplex 가 False 인 경우 파이프는 단방향입니다.
- conn1 은 메시지를 받는 데에만 사용할 수 있고, conn2 는 메시지를 보낼 때만 사용할 수 있습니다.
- 메서드 종류
poll(timeout=None)
- 파이프로부터 데이터를 수신할 준비가 되었는지 확인합니다.
- 이 메서드는 파이프에 새로운 데이터가 도착하면 True를 반환하고, 그렇지 않으면 False를 반환합니다.
- blocking 되지 않음
- timeout은 초 단위, None일 경우 무한 대기.
recv(buffersize=None, flags=0)
- recv() 메서드는 파이프로부터 데이터를 수신합니다.
- 이 메서드는 데이터가 도착하기 전까지 블로킹됩니다.
- 따라서, poll() 메서드를 사용하여 데이터 수신 준비가 된 상태인지 확인한 후, recv() 메서드를 호출해야 합니다.
- 이 메서드는 기본적으로 파이프에서 데이터를 읽을 때까지 대기합니다.
- 만약 파이프에서 데이터를 읽을 때 예외가 발생하면 EOFError를 발생시킵니다.
- buffersize
- 수신할 데이터의 최대 크기, None = 버퍼 크기 제한 x
- flags: 수신할 데이터에 대한 flag 지정.
send(obj, timeout=None, bufsize=-1)
- send() 메서드는 파이프를 통해 데이터를 송신합니다.
- 이 메서드는 기본적으로 파이프가 가득 찰 때까지 블로킹됩니다.
- 따라서, 데이터를 전송하기 전에 파이프의 버퍼 크기를 확인하고, 버퍼에 충분한 공간이 있는지 먼저 확인하는 것이 좋습니다.
- send() 메서드는 파이프 객체에 직렬화되지 않은 객체를 전송할 수 있으며, multiprocessing 모듈 내에서 객체를 직렬화하고 전송하는 과정은 자동으로 처리됩니다.
- parameter
- obj
- 전송할 객체입니다.
- 이 객체는 파이프에 저장될 때 자동으로 직렬화됩니다.
- timeout
- 파이프가 가득 찬 경우 블로킹될 때 대기할 시간을 지정합니다.
- 이 값은 초 단위로 지정하며, 기본값은 None입니다.
- None일 경우 무한대기를 의미합니다.
- bufsize
- 객체를 전송할 때 사용되는 버퍼의 크기를 지정합니다.
- 이 값은 바이트 단위로 지정하며, 기본값은 -1입니다.
- -1일 경우, 시스템의 기본 버퍼 크기를 사용합니다.
from multiprocessing import Process, Pipe, current_process
import time
import os
def worker(id, baseNum, conn):
process_id = os.getpid()
process_name = current_process().name
sub_total = 0
for _ in range(baseNum):
sub_total += 1
conn.send(sub_total)
conn.close()
print(f"Process ID: {process_id}, Process Name: {process_name}")
print(f"*** Result : {sub_total}")
def main():
parent_process_id = os.getpid()
print(f"Parent process ID {parent_process_id}")
start_time = time.time()
parent_conn, child_conn = Pipe()
t = Process(target=worker, args=(1, 100000000, child_conn))
t.start()
t.join()
print("--- %s seconds ---" % (time.time() - start_time))
print()
print("Main-Processing : {}".format(parent_conn.recv()))
print("Main-Processing Done!")
if __name__ == "__main__":
main()
5.4. SharedMemory(name=None, create=False, size=0)
5.5. Event