concurrent.futures 모듈

About_work·2022년 12월 13일
0

process, thread

목록 보기
4/23

언제 써야해? 장점?

  • 특정 작업을 thread <-> process 로 전환이 필요할 때 쉽게 변경할 수 있다.
    • 한정된 리팩터링만으로 간단한 I/O 동시성 <-> 병렬성 전환을 활성화할 수 있다. (threadpoolexecutor <-> processpoolexecutor)
  • futures 라는 것을 쓸 수 있음
    • 아직 완료되지 않은 (혹은 완료되었는지 당장은 모르는) 작업을 외부에서 객체로 다룰 수 있게 된다.
    • 예외를 디버깅하기 쉬움: result 메서드를 호출하면 스레드를 실행하는 중에 발생한 예외를 자동으로 전파시켜줌
  • submit 1번만 해줘도, 알아서 max_workers 내에서 조절하여 여러 thread로 작업을 처리해 주는 것 같다. (확인 필요)
  • pool의 장점을 취할 수 있다.(multiprocessing 패키지에서도 제공)
    • 프로세스 혹은 스레드에 하나의 task만 배당하여 실시하는 것이 아니라, task가 끝나도 프로세스 혹은 스레드를 제거하지 않고, 다른 task를 이어서 수행할 수 있다.
    • 즉, 삭제하고 재시작하는데 드는 비용을 줄일 수 있다.

단점?

  • max_workers의 개수를 미리 지정해야 하므로 I/O 병렬성 혹은 병렬성을 제한한다.
  • graceful shutdown을 위해 정밀한 설계 필요 (ctrl+c 한번으로 종료시키기 어렵다.)
    • future이 완료되어야만 shutdown이 된다.
    • 현재 실행 중인 future 은 취소할 수 없다.

concurrent.futures 메서드와 어트리뷰트

__PoolExecutor()

  • executor = __PoolExecutor()
    • "worker 스레드/프로세스를 관리하는 풀" + "작업을 분배하는 큐" + "결과를 수집하는 큐"를 관리합니다.
    • ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
      • 최대 max_workers 프로세스의 풀을 사용하여 호출을 비동기적으로 실행하는 Executor 서브 클래스.
      • max_workers 가 None 이거나 주어지지 않았다면, 기계의 프로세서 수를 기본값으로 사용합니다.
    • ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
      • 최대 max_workers 스레드의 풀을 사용하여 호출을 비동기적으로 실행하는 Executor 서브 클래스.
      • max_workers 가 None 이거나 주어지지 않았다면, 기본값으로 기계의 프로세서 수에 5 를 곱한 값을 사용합니다.
      • ThreadPoolExecutor 가 CPU 작업보다는 I/O를 동시에 진행하는데 자주 쓰이고, 작업자의 수가 ProcessPoolExecutor 보다 많아야 한다고 가정하고 있습니다.
  • executor.submit(fn, *args, **kwargs)
    • 콜러블 fn 이 fn(*args **kwargs) 처럼 실행되도록 예약하고, 콜러블 객체의 실행을 나타내는 Future 객체를 반환합니다.
from concurrent.futures import ProcessPoolExecutor
from time import sleep

def process(*args):
    result = # 입력받은 인자들을 처리한다...
    time.sleep(1)
    print(result)

def main():
	exe = ProcessPoolExecutor(max_workers=4) as exe:
	while true:
    	cont, x = collect_arguments()
	    if not cont:
    	    break
	    if x:
    	    exe.submit(process, *x)
	    else:
    	    sleep(0.1)
	exe.shutdown(wait=True)
 
if __name__ == "__main__":
	main()
  • executor.map(func, *iterables, timeout=None, chunksize=1)
    • iterables 는 느긋하게 처리되는 것이 아니라 즉시 수집됩니다.
    • func 는 비동기적으로 실행되며 func 에 대한 여러 호출이 동시에 이루어질 수 있습니다.
    • iterator을 반환합니다.
    • 반환된 이터레이터는 next() 가 호출되었을 때, Executor.map() 에 대한 최초 호출에서 timeout 초 후에도 결과를 사용할 수 없는 경우 concurrent.futures.TimeoutError 를 발생시킵니다.
# flags_threadpool.py
from concurrent import futures
# Reuse functions in flags.py
from flags import save_flag, get_flag, main

# 하나의 국기 이미지를 다운받는 함수. 각 worker가 이 함수를 수행함
def download_one(cc: str):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list: list[str]) -> int:
    # context manager로서 ThreadPoolExecutor를 인스턴스화 합니다.
    # executor.__exit__() 메소드는 executor.shutdown(wait=True)를 호출하는데,
    # 이는 스레드가 완료될 때까지 블락시킵니다.
    with futures.ThreadPoolExecutor() as executor:
        # map 메소드는 내장 함수 map과 유사합니다.
        # 첫 번째 인수인 download_one 함수는 여러 스레드에서 동시에 호출됩니다.
        # map 메소드는 각 함수 호출에서 리턴되는 값들을 반복할 수 있는 제너레이터를 반환합니다.
        # 여기서는 country code를 반환
        res = executor.map(download_one, sorted(cc_list))
    return len(list(res))

if __name__ == '__main__':
    # 이 스크립트에서 구현한 downlaod_many 콜러블을 인수로 전달하여,
    # flags.py에서 구현한 main 함수 호출
    main(download_many)
  • executor.shutdown(wait=True, *, cancel_futures=False)
    • 현재 계류 중인 퓨처가 실행 완료될 때, 사용 중인 모든 자원을 해제해야 한다는 것을 실행기에 알립니다.
    • 종료(shutdown) 후에 이루어지는 Executor.submit() 과 Executor.map() 호출은 RuntimeError 를 발생시킵니다.
    • wait
      • True 면, pending 중인 모든 퓨처가 실행을 마치고 실행기와 관련된 자원이 해제될 때까지 이 메서드는 돌아오지 않습니다.
      • wait 가 False 면, 이 메서드는 즉시 return하지만, 실행기와 연관된 자원은 pending 중인 모든 퓨처가 실행을 마칠 때 해제됩니다.
      • wait 의 값과 관계없이, 모든 pending 중인 퓨처가 실행을 마칠 때까지 전체 파이썬 프로그램이 종료되지 않습니다.
    • with 문을 사용하여 Executor를 종료시키면 (Executor.shutdown() 를 wait 값 True 로 호출한 것처럼 대기합니다), 이 메서드를 명시적으로 호출할 필요가 없어집니다.
    • cancel_futures
      • python 3.9에 cancel_futures가 추가되었습니다.
      • True이면, 이 메서드는 실행기가 실행을 시작시키지 않은 pending 중인 모든 퓨처를 취소합니다.
      • cancel_futures의 값과 관계없이 완료되었거나 실행 중인 퓨처는 취소되지 않습니다.
      • cancel_futures와 wait가 모두 True이면, 이 메서드가 반환하기 전에 실행기가 실행을 시작한 모든 퓨처가 완료됩니다. 나머지 퓨처는 취소됩니다.

Future

  • 콜러블 객체의 비동기 실행을 캡슐화합니다.
  • Future 인스턴스는 Executor.submit() 에 의해 생성되며 테스트를 제외하고는 직접 생성되어서는 안 됩니다.
  • future.result(timeout=None)
    • 완료될 때까지 기다리고, 완료되면 결과 값을 return 합니다.
  • future.exception(timeout=None)
    • 완료될 때까지 기다리고, 완료되면 예외 값을 return 합니다.
  • future.cancel()
    • 호출 취소 시도
      • 현재 실행 중이거나 실행 종료된 것들은 -> 호출 취소 못함
  • future.running()
    • True: 현재 실행 중이고, 취소할 수 없는 경우 입니다.
  • future.done()
    • True: 실행 완료 or 성공적 취소
  • future.cancelled()
    • True: 호출 취소 성공한 상태
  • future.add_done_callback(fn)
    • 콜러블 fn 을 퓨처에 연결합니다.
    • fn 은 퓨처가 취소되거나 실행이 종료될 때 퓨처를 유일한 인자로 호출됩니다.
  • future.set_result(result)
    • Future와 관련된 작업 결과를 result 로 설정합니다.
  • future.set_exception(exception)
    • Future와 관련된 작업 결과를 Exception exception 으로 설정합니다.

모듈 함수

  • concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
    • fs 로 주어진 여러 (서로 다른 Executor 인스턴스가 만든 것들도 가능합니다) Future 인스턴스들이 완료할 때까지 기다립니다.
    • wait는 특정 타임아웃 옵션을 설정해줌으로써 실행할 작업들을 순차적으로 병렬적으로 수행하는데, 해당 타임아웃을 넘어가는 작업물은 중단(pending) 시켜버린다.
    • 따라서 특정 타임아웃 내에만 수행이 완료된 작업들만 반환된다. 또한 설정한 타임아웃 내에 수행된 작업들은 done, 수행되지 않은 작업들은 not_done 속성으로 관찰이 가능하다.
    • 집합들의 이름있는 2-튜플을 돌려줍니다.
    • done 이라는 이름의 첫 번째 집합은 대기가 끝나기 전에 완료된 (끝났거나 취소된) 퓨처를 담고 있습니다.
    • not_done 이라는 이름의 두 번째 집합은 완료되지 않은 (계류 중이거나 실행 중인) 퓨처를 담고 있습니다.
    • timeout 은 반환하기 전에 대기 할 최대 시간(초)을 제어하는 데 사용할 수 있습니다.
    • timeout 은 int 또는 float가 될 수 있습니다. timeout 이 지정되지 않았거나 None 인 경우, 대기 시간에는 제한이 없습니다.
    • return_when 은, 이 함수가 언제 반환되어야 하는지를 나타냅니다. 다음 상수 중 하나여야 합니다: FIRST_COMPLETED / FIRST_EXCEPTION / ALL_COMPLETED
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait
import time

WORK_LIST = [int(1e4), int(1e5), int(1e6), int(1e7)]

# 누적 합 계산 함수
def sum_generator(n):
    sum_val = sum(x for x in range(1, n+1))
    return sum_val


def main():
    worker = min(10, len(WORK_LIST))

    st = time.time()

    future_lst = []
    with ProcessPoolExecutor(max_workers=worker) as executor:
        # 작업할 것들에 대해서
        for work in WORK_LIST:
            # submit으로 작업이 실행되도록 예약하고 객체의 실행을 나타내는 Future객체를 반환
            future = executor.submit(sum_generator, work)
            # future객체를 리스트에 담기
            future_lst.append(future)
            print('Scheduled for {}: {}'.format(work, future))

        # wait은 타임아웃 설정할 수 있는데, 해당 타임 초과되는 작업물들을 pending 됨
        result = wait(future_lst, timeout=7)
        print('Completed Task:', result.done)
        print('Over timeout Task:', result.not_done)
        # 수행된 결과물만 출력
        print([res.result() for res in result.done])
        # 수행되지 않은 결과물만 출력
        print([res.result() for res in result.not_done])


if __name__ == '__main__':
    main()
  • concurrent.futures.as_completed(fs, timeout=None)
    • fs 로 주어진 여러 (서로 다른 Executor 인스턴스가 만든 것들도 가능합니다) 퓨처들이 완료되는 대로 (끝났거나 취소된 퓨처) 일드 하는 Future 인스턴스의 이터레이터를 반환합니다.
    • as_completed가 wait과 다른 점은 정의한 작업물들 순서에 상관없이 시간이 적게 걸리는 작업들부터 수행한다는 것이다.
    • 다시 말해, 만약 [천만, 만, 백만, 십만] 에 대한 각각 누적합을 구하라고 한다면 wait는 리스트에 담겨있는 작업물 순차적으로 수행하지만
    • as_completed는 가장 적게 걸릴 만(10,000)에 대한 누적합을 구하고 다음은 십만 -> 백만 -> 천만 이런 식으로 수행한다는 것이다.
    • fs 에 중복된 퓨처가 들어있으면 한 번만 반환됩니다.
    • as_completed() 가 호출되기 전에 완료한 모든 퓨처들이 먼저 일드 됩니다.
    • 반환된 이터레이터는, next() 가 호출되고, as_completed() 호출 시점으로부터 timeout 초 후에 결과를 얻을 수 없는 경우 concurrent.futures.TimeoutError 를 발생시킵니다.
    • timeout 은 int 또는 float가 될 수 있습니다. timeout 이 지정되지 않았거나 None 인 경우, 대기 시간에는 제한이 없습니다.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time

WORK_LIST = [int(1e5), int(1e6), int(1e7), int(1e8)]

# 누적 합 계산 함수
def sum_generator(n):
    sum_val = sum(x for x in range(1, n+1))
    return sum_val


def main():
    worker = min(10, len(WORK_LIST))

    st = time.time()

    future_lst = []
    with ProcessPoolExecutor(max_workers=worker) as executor:
        for work in WORK_LIST:
            future = executor.submit(sum_generator, work)
            future_lst.append(future)
        
        # as_completed
        for future in as_completed(future_lst, timeout=7):
            # 가장 적게 걸리는 작업물부터 수행
            result = future.result()
            # 수행 완료된 작업물
            done = future.done()
            # 타임아웃을 벗어나 수행되지 않은 작업물
            cancelled = future.cancelled()
            print('Result: {}, Done: {}'.format(result, done))
            print('Cancelled: {}'.format(cancelled))



if __name__ == '__main__':
    main()

Futures를 죽이는 방법?

방법 1

  • 기본적으로, 작업자 스레드는 주 스레드가 종료되기 전에 완료되어야 합니다. 그렇지 않으면 나갈 수 없습니다.
  • 일반적인 해결 방법은, 각 스레드가 더 작업을 수행해야 하는지 여부를 결정하기 위해 확인할 수 있는 글로벌 상태를 가지는 것이다.
#!/usr/bin/env python
from __future__ import print_function

import concurrent.futures
import time
import sys

quit = False
def wait_a_bit(name):
    while not quit:
        print("{n} is doing work...".format(n=name))
        time.sleep(1)

def setup():
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    future1 = executor.submit(wait_a_bit, "Jack")
    future2 = executor.submit(wait_a_bit, "Jill")

    # main thread must be doing "work" to be able to catch a Ctrl+C 
    # http://www.luke.maurits.id.au/blog/post/threads-and-signals-in-python.html
    while (not (future1.done() and future2.done())):
        time.sleep(1)

if __name__ == "__main__":
    try:
        setup()
    except KeyboardInterrupt:
        quit = True

방법 2 ?

  • future.set_result(result)
  • future.set_exception(exception)

multiprocessing.pool VS concurrent.futures.ProcessPoolExecutor

multiprocessing.pool

  • 후자와 다르게, task를 취소할 능력이 없다.
  • 후자와 다르게, 다른 종류의 tasks 조합과 함께 일할 능력이 없다.
  • 후자와 다르게, 모든 tasks를 강제로 종료할 능력을 가지고 있다.
  • 후자와 다르게, task에서 발생할 수 있는 예외에 접근할 수 있는 능력이 없다.

concurrent.futures.ProcessPoolExecutor

  • 전자와 다르게, task를 취소할 능력이 있다.
  • 전자와 다르게, 다른 종류의 tasks 조합과 함께 일할 능력이 있다.
  • 전자와 다르게, 모든 tasks를 강제로 종료할 능력이 없다.
  • 전자와 다르게, task에서 발생할 수 있는 예외에 접근할 능력이 있다.
profile
새로운 것이 들어오면 이미 있는 것과 충돌을 시도하라.

0개의 댓글