Queue를_사용해서_스레드_사이의_작업_조율하라

매일 공부(ML)·2022년 7월 28일
0

이어드림

목록 보기
109/146

Queue 사용 X

  • 모든 작업이 다 끝났는지 검사를 하기 위해서 추가로 done_queue에 대해 바쁜 대기(busy waiting)를 수행

  • run메서드가 루프를 무한히 반복

    • 작업자 스레드에게 루프를 중단할 시점을 알려줄 방도가 없다.
  • 파이프라인 진행이 막힐 경우 프로그램이 임의로 중단된다.


Queue 사용 O

  • 새로운 데이터가 나타날 때까지 get 메서드가 블록되게 하여 작업자의 바쁜 대기 문제를 해결

    • get메서드가 반환할 원소가 생기기 전까지 스레드는 끝나지 않는다.
  • 파이프라인이 중간에 막히지 않는다.

    • 두 단계 사이에 허용 가능한 미완성 작업의 최대 개수 지정

    • 버퍼 크기를 정하면 큐가 이미 가득 찬 경우 put 블록

    • 센티널 원소를 추가하는 close 메서드 정의

  • join()메서드를 활용하면 병렬성을 노이게 되어서프로그램의 속도 증가

#위와 같은 문제를 queue 내장 모듈에 있는 Queue 클래스는 앞에서 설명한 모든 문제 해결

from queue import Queue
from threading import Thread
import time

my_queue = Queue()

def consumer():
    print('소비자 대기')
    my_queue.get() #다음에 보여줄 put()가 실행이 된 후에 실행
    print('소비자 완료')

thread = Thread(target=consumer)
thread.start()

print('생산자 데이터 추가')
my_queue.put(object())

print('생산자 완료')

소비자 대기
생산자 데이터 추가
생산자 완료소비자 완료


#큐 작업 진행 감시 및 적당한 횟수 호출

class CloseableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)

    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTTINEL:
                    return #스레드를 종료

                yield item

            finally:
                self.task_done()

            
class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)
def start_threads(count, *args):
    threads = [StoppableWorker(*args) for _ in range(count)]
    for thread in threads:
        thread.start()
    return threads

def stop_threads(closable_queue, threads):
    for _ in threads:
        closable_queue.close()

    closable_queue.join()

    for thread in threads:
        thread.join()

summary

  • 순차적인 작업을 동시에 여러 파이썬 스레드에서 실행되도록 조작하고 싶을 때, 특히 I/O위주의 프로그램인 경우라면 파이프라인이 매우 유용하다.

  • 동시성 파이프라인을 만들 때 발생할 수 있는 여러가지문제(바쁜 대기, 작업자에게 종료를 알리는 방법, 잠재적인 메모리 사용량 폭발 등)를 잘 알아두라.

  • Queue 클래스는 튼튼한 파이프라인을 구축할 때 필요한 기능인 블로킹 연산, 버퍼 크기 지정, join을 통한 완료 대기를 모두 제공

profile
성장을 도울 아카이빙 블로그

0개의 댓글