threading 모듈

About_work·2022년 12월 12일
0

process, thread

목록 보기
6/23

언제 써야해? 장점?

  • 한줄요약: 1개의 주방 / 10개의 요리사 / 10개 해야할 요리
  • blocking I/O 를 해결할 때 사용한다.
  • CPU를 적게 쓰고, I/O waiting 이 많은 테스크에 적합
  • 코드를 가급직 손보지 않고, 간편한 모듈

단점

  • 메모리 lock에 신경을 많이 써야합니다.
  • pool의 기능이 없다. (완전히 확실하진 않다.)
    • 스레드를 시작하고 실행하는데 비용이 든다.
    • 동시성 작업마다 스레드를 하나씩 실행하면, 스레드 개수가 많아지면 최악의 결과를 낳는다.
    • 스레드 삭제하고 다시 만드는게, 꼭 필요한지를 따져보고 진행하자.
  • 스레드를 시작하거나 종료하기를 기다리는 코드(예: 메인프로세스)에게,
    • 스레드 실행 중에 발생한 예외를 돌려주는 파이썬 내장 기능은 없다.
      • 이로 인해 스레드 디버깅이 어렵다.
      • TODO: traceback을 써야할듯? (해봐야 암)

사용시 주의 사항?

  • 스레드 하나당 8MB의 메모리가 필요
  • 파이썬에서는 메인 스레드에서 발생한 예외가 자동으로 다른 자식 스레드에게 전달되지 않습니다.
    • 각 스레드는 독립적으로 실행되며, 메인 스레드에서 발생한 예외는 메인 스레드에서만 처리됩니다.
    • 자식 스레드가 이러한 예외를 알거나 이에 반응하려면 명시적으로 이를 전달하거나 통신하는 메커니즘을 사용해야 합니다.
    • Ctrl + C (KeyboardInterrupt)에 대해, OS는 주 프로세스(? 확인 필요)의 main thread에만, 이 SIGINT 신호를 전달하고
      • 자식 thread에 대해서는 SIGINT 신호가 전달되지 않는다.
  • 자식 스레드가 메인 스레드에서 발생한 예외를 감지하고 종료할 수 있도록 하려면, 이벤트 플래그와 같은 메커니즘을 사용할 수 있습니다.

threading 모듈 객체

  • Thread: 단일 실행 스레드를 만드는 객체
  • Lock: 기본적인 Lock 객체
  • RLock: 재진입 가능한 락객체. 이미 획득한 락을 다시 획득할 수 있다.
  • Condition: 다른 스레드에서 신호를 줄 때까지 기다릴 수 있는 컨디션 변수 객체
  • Event: 컨디션 변수의 일반화 버전
  • Semaphore: 정해놓은 갯수만큼의 스레드를 허용하는 동기화 객체 (예를들어 최대 50개 까지만 동시에 실행)
  • Timer: Thread와 비슷하지만, 실행되기 전에 지정된 시간 동안 대기
  • Barrier: 스레드들이 계속 진행 할 수 있으려면, 지정된 숫자의 스레드가 해당 지점까지 도달해야하게 만듦

Thread 생성

  • 메인 스레드가 아래를 통해 thread를 생성(fork)한다.
Thread(name=, target=, args= , kwargs= , *, daemon=)
  • name: 로깅 등을 위한 용도로 쓰며, 주지 않아도 무방
  • target: 스레드에서 실행할 함수
  • args: tuple 형식으로 넘겨줘야 함
  • kwargs: dict 형식으로 넘겨줘야 함
  • daemon: 데몬으로 실행되는 스레드는 프로세스가 종료될 때 즉각 중단된다.

데몬 스레드

  • 백그라운드에서 실행되는 스레드.
  • 메인스레드가 종료되면 즉시 종료되는 스레드
  • 만약 데몬 스레드가 아니면, 해당 서브스레드는 메인 스레드가 종료되더라도 자신의 작업이 끝날 때까지 계속 실행된다.

Thread 클래스 method/arrtibutes

import threading
import time
from typing import List

shared_number = 0
lock = threading.Lock()  # threading에서 Lock 함수 가져오기

def thread_1(number: int) -> None:
    """
    주어진 수만큼 shared_number를 증가시킵니다.

    Args:
        number (int): 증가시킬 수

    """
    global shared_number
    print("number =", end=""), print(number)
    
    lock.acquire()  # 작업이 끝나기 전까지 다른 쓰레드가 공유데이터 접근을 금지
    for i in range(number):
        shared_number += 1
    lock.release()  # lock 해제

def thread_2(number: int) -> None:
    """
    주어진 수만큼 shared_number를 증가시킵니다.

    Args:
        number (int): 증가시킬 수

    """
    global shared_number
    print("number =", end=""), print(number)

    lock.acquire()  # thread_2 잠금
    for i in range(number):
        shared_number += 1
    lock.release()  # thread_2 해제

def main() -> None:
    """
    두 개의 스레드를 생성하여 동시에 실행합니다.
    각 스레드는 shared_number를 주어진 수만큼 증가시킵니다.
    실행 시간을 출력하고 최종 shared_number 값을 출력합니다.
    """
    global shared_number
    threads: List[threading.Thread] = []

    start_time = time.time()
    t1 = threading.Thread(target=thread_1, args=(50000000,))
    t1.start()
    threads.append(t1)

    t2 = threading.Thread(target=thread_2, args=(50000000,))
    t2.start()
    threads.append(t2)

    for t in threads:
        t.join()

    print("--- %s seconds ---" % (time.time() - start_time))

    print("shared_number=", end=""), print(shared_number)
    print("end of main")

if __name__ == "__main__":
    main()
  • start()
    • 스레드를 시작한다.
    • 즉시 리턴한다. (non-blocking)
  • join(timeout=)
    • 해당 thread에서 실행되는 함수가 종료될 때까지 기다린다.
    • timeout에 따라 특정 시간까지만 기다리게 할 수 있다.
    • 타임아웃을 초과해도 예외를 일으키지 않고 None을 return 하므로, 이 경우 is_alive()를 호출하여 스레드가 실행 중인지를 파악할 필요가 있다.
  • is_alive()
    • 해당 스레드가 동작 중인지 확인
  • name
    • 스레드의 이름
  • ident
    • 스레드 식별자, 정수값
  • native_id
    • 스레드 고유 식별자. ident는 종료된 스레드 이후에 새로 만들어진 다른 스레드에 재활용될 수 있다.

주의점

  • Thread.start()는 즉시 리턴하기 때문에, worker 스레드들이 동작하고 있는 중에 main thread가 적절히 기다려주지 않는다면 프로그램이 중간에 끝날 수 있기 때문에 유의해야 한다.
  • 프로세스의 종료 시점은 메인 스레드가 종료 지점에 도달했을 때이며, 다른 워크 스레드의 실행 여부는 고려되지 않는다.
  • 따라서 중도 종료를 막기 위해서는 join() 메서드를 이용하여 메인 스레드가 워커 스레드들을 기다리도록 하여야 한다.

threading.Thread로부터 파생된 파생클래스를 작성하여 사용하는 방식을 활용하라.

Queue 모듈을 사용할 수 있다.

  • 언제? 주어진 task가 하나도 빠짐없이 실행되어야 할 경우.
  • 작업자 스레드 수를 고정하고 Queue와 함께 사용하면, 스레드를 사용할 때 팬인과 팬아웃의 규모 확장성을 개선할 수 있다.
  • 단점
    • Queue를 사용하도록 기존 코드를 리팩터링 하려면 상당히 많은 작업이 필요하다.
    • Queue는 프로그램이 활용할 수 있는 전체 I/O 병렬성의 정도를 제한한다는 단점이 있다.

Lock 모듈을 사용할 수 있다.

  • MutEx(Mutual Exclusion=상호배제) 라고도 한다.
  • 멀티 threading 에서 메모리 동시 접근 문제를 해결하는 방법론
  • 메모리 접근 권한
    • lock = threading.Lock()
    • lock.acquire()
    • lock.release()
    • lock 의 문제점:
      • 공유 자원에 대해 오직 하나의 thread 만 접근 가능하다는 점

Semaphore 모듈을 사용할 수 있다.

  • 정해놓은 갯수만큼의 스레드를 허용하는 동기화 객체 (예를들어 최대 50개 까지만 동시에 실행)
import threading
import glob
import time
from typing import List

sema = threading.Semaphore(10)

def copy_from(source: str) -> None:
    """
    주어진 파일을 복사하여 'copied_csv' 폴더에 저장합니다.

    Args:
        source (str): 원본 파일 경로

    """
    target = "copied_csv/{}".format(source.split("/")[-1])

    sema.acquire()
    time.sleep(2)

    print("copy ({})".format(source))
    with open(source, "r") as rf:
        with open(target, "w") as wf:
            wf.write(rf.read())

    sema.release()

def parallel_copy(source_path: str) -> None:
    """
    주어진 경로에 있는 모든 파일을 병렬로 복사합니다.

    Args:
        source_path (str): 복사할 파일들이 있는 경로 (글로벌 패턴)

    """
    thread_list: List[threading.Thread] = [threading.Thread(target=copy_from, args=(source,)) for source in glob.glob(source_path)]

    for thread in thread_list:
        thread.start()

    for thread in thread_list:
        thread.join()

def main() -> None:
    """
    주어진 경로의 모든 파일을 병렬로 복사하고 실행 시간을 출력합니다.
    """
    source_path = "source_csv/*"
    start = time.time()

    parallel_copy(source_path)

    print(time.time() - start)

if __name__ == "__main__":
    main()
  • semaphore이 0이 되면 작업을 중지하고 lock이 해제된다.
  • sema = Semaphore(1)
  • sema.acquire()
  • sema.release()

threading.Event()

  • threading.Event 객체는 Python에서 멀티쓰레딩 환경에서 스레드 간의 신호를 주고받기 위한 동기화 프리미티브
  • Event 객체는 내부적으로 불리언 플래그를 유지하며, 플래그가 설정(set)되었는지 여부를 스레드들이 확인하고 대기할 수 있습니다.
  • 이를 통해 여러 스레드 간의 통신 및 동기화를 간편하게 할 수 있습니다.

threading.Event 객체의 사용 목적

  1. 스레드 간의 신호 전달:

    • Event 객체를 사용하여 하나의 스레드가 다른 스레드에게 신호를 보낼 수 있습니다.
    • 신호를 받은 스레드는 특정 작업을 수행하거나, 대기 상태에서 깨어날 수 있습니다.
  2. 동기화:

    • 여러 스레드가 특정 조건이 충족될 때까지 대기하도록 할 수 있습니다.
    • Event 객체의 상태가 설정되면(set) 대기 중인 모든 스레드가 깨어나 실행을 재개합니다.
  3. 상태 플래그 관리:

    • Event 객체는 상태 플래그를 설정(set), 리셋(reset), 대기(wait)하는 메서드를 제공합니다.
    • 이를 통해 스레드 간의 복잡한 상태 관리를 단순화할 수 있습니다.

threading.Event 객체의 주요 메서드

  • set(): 이벤트 플래그를 True로 설정하여 모든 대기 중인 스레드를 깨웁니다.
  • clear(): 이벤트 플래그를 False로 설정
  • wait(timeout=None): 이벤트 플래그가 True가 될 때까지 대기합니다. 선택적으로 타임아웃을 설정할 수 있습니다.
  • is_set(): 이벤트 플래그의 현재 상태를 반환합니다.

멀티 쓰레딩 환경에서의 threading.Event 사용 예시

import threading
import time
from typing import List

# Event 객체 생성
event: threading.Event = threading.Event()

def worker(event: threading.Event, worker_id: int) -> None:
    print(f"Worker {worker_id} waiting for event to be set")
    event.wait()  # 이벤트가 설정될 때까지 대기
    print(f"Worker {worker_id} event received, starting work")

# 스레드 생성
threads: List[threading.Thread] = []
for i in range(3):
    thread: threading.Thread = threading.Thread(target=worker, args=(event, i))
    threads.append(thread)
    thread.start()

time.sleep(2)
print("Main thread setting event")
event.set()  # 이벤트 설정

# 모든 스레드가 종료될 때까지 대기
for thread in threads:
    thread.join()

print("All workers have finished")

하나의 Event 객체를 여러 스레드에서 공유해서 사용하는 경우

  • 위 예시에서 볼 수 있듯이, 하나의 Event 객체를 여러 스레드에서 공유하여 사용하는 것이 가능
  • 실제로 이는 Event 객체의 주요 사용 사례 중 하나
  • 여러 스레드가 동일한 Event 객체를 대기하다가, 특정 시점에 메인 스레드나 다른 스레드에서 이벤트를 설정(set)하면 모든 대기 중인 스레드가 깨어나서 작업을 시작하게 됩니다.
profile
새로운 것이 들어오면 이미 있는 것과 충돌을 시도하라.

0개의 댓글