언제 써야해? 장점?
- 특정 작업을 thread <-> process 로 전환이 필요할 때 쉽게 변경할 수 있다.
- 한정된 리팩터링만으로 간단한 I/O 동시성 <-> 병렬성 전환을 활성화할 수 있다. (threadpoolexecutor <-> processpoolexecutor)
- futures 라는 것을 쓸 수 있음
- 아직 완료되지 않은 (혹은 완료되었는지 당장은 모르는) 작업을 외부에서 객체로 다룰 수 있게 된다.
- 예외를 디버깅하기 쉬움: result 메서드를 호출하면 스레드를 실행하는 중에 발생한 예외를 자동으로 전파시켜줌
- submit 1번만 해줘도, 알아서 max_workers 내에서 조절하여 여러 thread로 작업을 처리해 주는 것 같다. (확인 필요)
- pool의 장점을 취할 수 있다.(multiprocessing 패키지에서도 제공)
- 프로세스 혹은 스레드에 하나의 task만 배당하여 실시하는 것이 아니라, task가 끝나도 프로세스 혹은 스레드를 제거하지 않고, 다른 task를 이어서 수행할 수 있다.
- 즉, 삭제하고 재시작하는데 드는 비용을 줄일 수 있다.
단점?
- max_workers의 개수를 미리 지정해야 하므로 I/O 병렬성 혹은 병렬성을 제한한다.
- graceful shutdown을 위해 정밀한 설계 필요 (ctrl+c 한번으로 종료시키기 어렵다.)
- future이 완료되어야만 shutdown이 된다.
- 현재 실행 중인 future 은 취소할 수 없다.
concurrent.futures 메서드와 어트리뷰트
__PoolExecutor()
- executor = __PoolExecutor()
- "worker 스레드/프로세스를 관리하는 풀" + "작업을 분배하는 큐" + "결과를 수집하는 큐"를 관리합니다.
- ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
- 최대 max_workers 프로세스의 풀을 사용하여 호출을 비동기적으로 실행하는 Executor 서브 클래스.
- max_workers 가 None 이거나 주어지지 않았다면, 기계의 프로세서 수를 기본값으로 사용합니다.
- ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
- 최대 max_workers 스레드의 풀을 사용하여 호출을 비동기적으로 실행하는 Executor 서브 클래스.
- max_workers 가 None 이거나 주어지지 않았다면, 기본값으로 기계의 프로세서 수에 5 를 곱한 값을 사용합니다.
- ThreadPoolExecutor 가 CPU 작업보다는 I/O를 동시에 진행하는데 자주 쓰이고, 작업자의 수가 ProcessPoolExecutor 보다 많아야 한다고 가정하고 있습니다.
- executor.submit(fn, *args, **kwargs)
- 콜러블 fn 이 fn(*args **kwargs) 처럼 실행되도록 예약하고, 콜러블 객체의 실행을 나타내는 Future 객체를 반환합니다.
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def process(*args):
result =
time.sleep(1)
print(result)
def main():
exe = ProcessPoolExecutor(max_workers=4) as exe:
while true:
cont, x = collect_arguments()
if not cont:
break
if x:
exe.submit(process, *x)
else:
sleep(0.1)
exe.shutdown(wait=True)
if __name__ == "__main__":
main()
- executor.map(func, *iterables, timeout=None, chunksize=1)
- iterables 는 느긋하게 처리되는 것이 아니라 즉시 수집됩니다.
- func 는 비동기적으로 실행되며 func 에 대한 여러 호출이 동시에 이루어질 수 있습니다.
- iterator을 반환합니다.
- 반환된 이터레이터는 next() 가 호출되었을 때, Executor.map() 에 대한 최초 호출에서 timeout 초 후에도 결과를 사용할 수 없는 경우 concurrent.futures.TimeoutError 를 발생시킵니다.
from concurrent import futures
from flags import save_flag, get_flag, main
def download_one(cc: str):
image = get_flag(cc)
save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True)
return cc
def download_many(cc_list: list[str]) -> int:
with futures.ThreadPoolExecutor() as executor:
res = executor.map(download_one, sorted(cc_list))
return len(list(res))
if __name__ == '__main__':
main(download_many)
- executor.shutdown(wait=True, *, cancel_futures=False)
- 현재 계류 중인 퓨처가 실행 완료될 때, 사용 중인 모든 자원을 해제해야 한다는 것을 실행기에 알립니다.
- 종료(shutdown) 후에 이루어지는 Executor.submit() 과 Executor.map() 호출은 RuntimeError 를 발생시킵니다.
- wait
- True 면, pending 중인 모든 퓨처가 실행을 마치고 실행기와 관련된 자원이 해제될 때까지 이 메서드는 돌아오지 않습니다.
- wait 가 False 면, 이 메서드는 즉시 return하지만, 실행기와 연관된 자원은 pending 중인 모든 퓨처가 실행을 마칠 때 해제됩니다.
- wait 의 값과 관계없이, 모든 pending 중인 퓨처가 실행을 마칠 때까지 전체 파이썬 프로그램이 종료되지 않습니다.
- with 문을 사용하여 Executor를 종료시키면 (Executor.shutdown() 를 wait 값 True 로 호출한 것처럼 대기합니다), 이 메서드를 명시적으로 호출할 필요가 없어집니다.
- cancel_futures
- python 3.9에 cancel_futures가 추가되었습니다.
- True이면, 이 메서드는 실행기가 실행을 시작시키지 않은 pending 중인 모든 퓨처를 취소합니다.
- cancel_futures의 값과 관계없이 완료되었거나 실행 중인 퓨처는 취소되지 않습니다.
- cancel_futures와 wait가 모두 True이면, 이 메서드가 반환하기 전에 실행기가 실행을 시작한 모든 퓨처가 완료됩니다. 나머지 퓨처는 취소됩니다.
Future
- 콜러블 객체의 비동기 실행을 캡슐화합니다.
- Future 인스턴스는 Executor.submit() 에 의해 생성되며 테스트를 제외하고는 직접 생성되어서는 안 됩니다.
- future.result(timeout=None)
- 완료될 때까지 기다리고, 완료되면 결과 값을 return 합니다.
- future.exception(timeout=None)
- 완료될 때까지 기다리고, 완료되면 예외 값을 return 합니다.
- future.cancel()
- 호출 취소 시도
- 현재 실행 중이거나 실행 종료된 것들은 -> 호출 취소 못함
- future.running()
- True: 현재 실행 중이고, 취소할 수 없는 경우 입니다.
- future.done()
- future.cancelled()
- future.add_done_callback(fn)
- 콜러블 fn 을 퓨처에 연결합니다.
- fn 은 퓨처가 취소되거나 실행이 종료될 때 퓨처를 유일한 인자로 호출됩니다.
- future.set_result(result)
- Future와 관련된 작업 결과를 result 로 설정합니다.
- future.set_exception(exception)
- Future와 관련된 작업 결과를 Exception exception 으로 설정합니다.
모듈 함수
- concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
- fs 로 주어진 여러 (서로 다른 Executor 인스턴스가 만든 것들도 가능합니다) Future 인스턴스들이 완료할 때까지 기다립니다.
- wait는 특정 타임아웃 옵션을 설정해줌으로써 실행할 작업들을 순차적으로 병렬적으로 수행하는데, 해당 타임아웃을 넘어가는 작업물은 중단(pending) 시켜버린다.
- 따라서 특정 타임아웃 내에만 수행이 완료된 작업들만 반환된다. 또한 설정한 타임아웃 내에 수행된 작업들은 done, 수행되지 않은 작업들은 not_done 속성으로 관찰이 가능하다.
- 집합들의 이름있는 2-튜플을 돌려줍니다.
- done 이라는 이름의 첫 번째 집합은 대기가 끝나기 전에 완료된 (끝났거나 취소된) 퓨처를 담고 있습니다.
- not_done 이라는 이름의 두 번째 집합은 완료되지 않은 (계류 중이거나 실행 중인) 퓨처를 담고 있습니다.
- timeout 은 반환하기 전에 대기 할 최대 시간(초)을 제어하는 데 사용할 수 있습니다.
- timeout 은 int 또는 float가 될 수 있습니다. timeout 이 지정되지 않았거나 None 인 경우, 대기 시간에는 제한이 없습니다.
- return_when 은, 이 함수가 언제 반환되어야 하는지를 나타냅니다. 다음 상수 중 하나여야 합니다: FIRST_COMPLETED / FIRST_EXCEPTION / ALL_COMPLETED
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait
import time
WORK_LIST = [int(1e4), int(1e5), int(1e6), int(1e7)]
def sum_generator(n):
sum_val = sum(x for x in range(1, n+1))
return sum_val
def main():
worker = min(10, len(WORK_LIST))
st = time.time()
future_lst = []
with ProcessPoolExecutor(max_workers=worker) as executor:
for work in WORK_LIST:
future = executor.submit(sum_generator, work)
future_lst.append(future)
print('Scheduled for {}: {}'.format(work, future))
result = wait(future_lst, timeout=7)
print('Completed Task:', result.done)
print('Over timeout Task:', result.not_done)
print([res.result() for res in result.done])
print([res.result() for res in result.not_done])
if __name__ == '__main__':
main()
- concurrent.futures.as_completed(fs, timeout=None)
- fs 로 주어진 여러 (서로 다른 Executor 인스턴스가 만든 것들도 가능합니다) 퓨처들이 완료되는 대로 (끝났거나 취소된 퓨처) 일드 하는 Future 인스턴스의 이터레이터를 반환합니다.
- as_completed가 wait과 다른 점은 정의한 작업물들 순서에 상관없이 시간이 적게 걸리는 작업들부터 수행한다는 것이다.
- 다시 말해, 만약 [천만, 만, 백만, 십만] 에 대한 각각 누적합을 구하라고 한다면 wait는 리스트에 담겨있는 작업물 순차적으로 수행하지만
- as_completed는 가장 적게 걸릴 만(10,000)에 대한 누적합을 구하고 다음은 십만 -> 백만 -> 천만 이런 식으로 수행한다는 것이다.
- fs 에 중복된 퓨처가 들어있으면 한 번만 반환됩니다.
- as_completed() 가 호출되기 전에 완료한 모든 퓨처들이 먼저 일드 됩니다.
- 반환된 이터레이터는, next() 가 호출되고, as_completed() 호출 시점으로부터 timeout 초 후에 결과를 얻을 수 없는 경우 concurrent.futures.TimeoutError 를 발생시킵니다.
- timeout 은 int 또는 float가 될 수 있습니다. timeout 이 지정되지 않았거나 None 인 경우, 대기 시간에는 제한이 없습니다.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
WORK_LIST = [int(1e5), int(1e6), int(1e7), int(1e8)]
def sum_generator(n):
sum_val = sum(x for x in range(1, n+1))
return sum_val
def main():
worker = min(10, len(WORK_LIST))
st = time.time()
future_lst = []
with ProcessPoolExecutor(max_workers=worker) as executor:
for work in WORK_LIST:
future = executor.submit(sum_generator, work)
future_lst.append(future)
for future in as_completed(future_lst, timeout=7):
result = future.result()
done = future.done()
cancelled = future.cancelled()
print('Result: {}, Done: {}'.format(result, done))
print('Cancelled: {}'.format(cancelled))
if __name__ == '__main__':
main()
Futures를 죽이는 방법?
방법 1
- 기본적으로, 작업자 스레드는 주 스레드가 종료되기 전에 완료되어야 합니다. 그렇지 않으면 나갈 수 없습니다.
- 일반적인 해결 방법은, 각 스레드가 더 작업을 수행해야 하는지 여부를 결정하기 위해 확인할 수 있는 글로벌 상태를 가지는 것이다.
from __future__ import print_function
import concurrent.futures
import time
import sys
quit = False
def wait_a_bit(name):
while not quit:
print("{n} is doing work...".format(n=name))
time.sleep(1)
def setup():
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
future1 = executor.submit(wait_a_bit, "Jack")
future2 = executor.submit(wait_a_bit, "Jill")
while (not (future1.done() and future2.done())):
time.sleep(1)
if __name__ == "__main__":
try:
setup()
except KeyboardInterrupt:
quit = True
방법 2 ?
- future.set_result(result)
- future.set_exception(exception)
multiprocessing.pool VS concurrent.futures.ProcessPoolExecutor
multiprocessing.pool
- 후자와 다르게, task를 취소할 능력이 없다.
- 후자와 다르게, 다른 종류의 tasks 조합과 함께 일할 능력이 없다.
- 후자와 다르게, 모든 tasks를 강제로 종료할 능력을 가지고 있다.
- 후자와 다르게, task에서 발생할 수 있는 예외에 접근할 수 있는 능력이 없다.
concurrent.futures.ProcessPoolExecutor
- 전자와 다르게, task를 취소할 능력이 있다.
- 전자와 다르게, 다른 종류의 tasks 조합과 함께 일할 능력이 있다.
- 전자와 다르게, 모든 tasks를 강제로 종료할 능력이 없다.
- 전자와 다르게, task에서 발생할 수 있는 예외에 접근할 능력이 있다.