asyncio 모듈 (3): 심화

About_work·2022년 12월 14일
0

process, thread

목록 보기
1/23

목차

  • loop.run_in_executor()
  • loop.run_until_complete()
  • asyncio.run_coroutine_threadsafe
  • asyncio.new_event_loop()

메인 코드

asyncio.run(run_tasks_mixed(handles, 0.1, output_path))

confirm_merge(input_paths, output_path)

tmpdir.cleanup()

async def run_tasks_mixed(handles, interval, output_path):
    # get_event_loop: 현재의 이벤트 루프를 가져옵니다.
    loop = asyncio.get_event_loop()

    with open(output_path, 'wb') as output:
        async def write_async(data):
            output.write(data)

        def write(data):
            coro = write_async(data)
            future = asyncio.run_coroutine_threadsafe(
                coro, loop)
            future.result()

        futures = []
        for handle in handles:
            future = loop.run_in_executor(
                None, tail_file, handle, interval, write)
            futures.append(future)
        # asyncio.gather 과 await을 이용하여, tail_file 이 모두 종료되도록 팬인 시킨다.
        await asyncio.gather(*futures)
        
def tail_file(handle, interval, write_func):
    """
    Objectives
        - 새로운 줄이 들어오면, 주어진 콜백 함수로 그 줄을 전달해서 -> 출력 로그에 데이터를 기록한다.
        - 데이터가 없는 경우 -> 데이터를 폴링하며 대기하느라 CPU를 낭비하는 시간을 줄이기 위해 thread를 일시 정지 상태로 만든다.
        - 폴링
            -  하나의 장치(또는 프로그램)가 충돌 회피 또는 동기화 처리 등을 목적으로 다른 장치(또는 프로그램)의 상태를 주기적으로 검사하여 일정한 조건을 만족할 때 송수신 등의 자료처리를 하는 방식을 말한다
    """
    while not handle.closed:
        try:
            line = readline(handle)
        except NoNewData:
            time.sleep(interval)
        else:
            write_func(line)

awaitable loop.run_in_executor(executor, func, *args)

  • 지정된 실행기에서 func 가 호출되도록 배치합니다.
  • executor 인자는 Executor 인스턴스여야 합니다. executor 가 None 이면 기본 실행기가 사용됩니다.
  • 이 메서드는 asyncio.Future 객체를 반환합니다.
  • func 에 키워드 인자를 전달하려면 functools.partial()를 사용하십시오.
    • loop.run_in_executor(None, functools.partial(print, 'Hello', 'Python', end=' '))
    • functools.partial(함수, 위치인수, 키워드인수)
    • functools.partial은 인수가 포함된 부분 함수를 반환하는데, 반환된 함수에 다시 인수를 지정해서 호출할 수 있습니다.

awaitable asyncio.gather(*aws, return_exceptions=False)

  • aws 시퀀스에 있는 어웨이터블 객체를 동시에 실행합니다.
  • aws에 있는 어웨이터블이 코루틴이면 자동으로 태스크로 예약됩니다.
  • 모든 어웨이터블이 성공적으로 완료되면, 결과는 반환된 값들이 합쳐진 리스트입니다.
  • await을 앞에 붙이면, 성공될 때까지 기다립니다. fan-in / join의 역할을 합니다.
  • 결괏값의 순서는 aws에 있는 어웨이터블의 순서와 일치합니다.
  • return_exceptions가 False(기본값)면, 첫 번째 발생한 예외가 gather()를 기다리는 태스크로 즉시 전파됩니다.
  • aws 시퀀스의 다른 어웨이터블은 취소되지 않고 계속 실행됩니다.
  • return_exceptions가 True면, 예외는 성공적인 결과처럼 처리되고, 결과 리스트에 집계됩니다.
  • gather()가 취소되면, 모든 제출된 (아직 완료되지 않은) 어웨이터블도 취소됩니다.
  • aws 시퀀스의 Task나 Future가 취소되면, 그것이 CancelledError를 일으킨 것처럼 처리됩니다 – 이때 gather() 호출은 취소되지 않습니다.
    • 이것은 제출된 태스크/퓨처 하나를 취소하는 것이 다른 태스크/퓨처를 취소하게 되는 것을 막기 위한 것입니다.

asyncio.run_coroutine_threadsafe(coro, loop)

  • 주어진 이벤트 루프에 코루틴을 제출합니다. 스레드 안전합니다.
  • Lock과 도우미 함수를 쓸 필요가 없다.
  • 다른 OS 스레드에서 결과를 기다리는 concurrent.futures.Future를 반환합니다.
  • 이 함수는 이벤트 루프가 실행 중인 스레드가 아닌, 다른 OS 스레드에서 호출하기 위한 것입니다.

리펙토링 후, 코드

asyncio.run(run_fully_async(handles, 0.1, output_path))

confirm_merge(input_paths, output_path)

tmpdir.cleanup()

async def run_fully_async(handles, interval, output_path):
    async with WriteThread(output_path) as output:
        tasks = []
        for handle in handles:
            coro = tail_async(handle, interval, output.write)
            task = asyncio.create_task(coro)
            tasks.append(task)

        await asyncio.gather(*tasks)

class WriteThread(Thread):
    def __init__(self, output_path):
        super().__init__()
        self.output_path = output_path
        self.output = None
        self.loop = asyncio.new_event_loop()

    def run(self):
        asyncio.set_event_loop(self.loop)
        with open(self.output_path, 'wb') as self.output:
            self.loop.run_forever()

        # 맨 마지막에 한번 더 이벤트 루프를 실행해서
        # 다른 이벤트 루프가 stop()에 await하는 경우를 해결한다.
        self.loop.run_until_complete(asyncio.sleep(0))
        
    async def real_write(self, data):
        self.output.write(data)

    async def write(self, data):
        coro = self.real_write(data)
        future = asyncio.run_coroutine_threadsafe(
            coro, self.loop)
        await asyncio.wrap_future(future)

###
async def tail_async(handle, interval, write_func):
    loop = asyncio.get_event_loop()

    while not handle.closed:
        try:
            line = await loop.run_in_executor(
                None, readline, handle)
        except NoNewData:
            await asyncio.sleep(interval)
        else:
            await write_func(line)
           

asyncio.wrap_future(future, *, loop=None)

  • concurrent.futures.Future 객체를 asyncio.Future 객체로 감쌉니다.

리펙토링 잘 하는 법(코드를 점진적으로 asyncio와 코루틴 기반으로 바꾸는 법)

  • 기본

    • 블로킹 I/O 에 thread를 사용하는 부분과 코루틴을 사용하는 부분이 서로 호환되면서 공존할 수 있어야 → 리펙토링이 쉽다.
    • 여러분의 thread가 코루틴을 실행할 수 있어야 하고, 코루틴이 thread를 시작하거나 기다릴 수 있어야 한다는 뜻이다.
      • 다행히 asyncio에는 이런 상호작용을 쉽게 제공할 수 있는 도구가 들어 있다.
  • 하향식(top-down)

    • main 진입점부터 → 점차 호출 계층의 잎 부분에 위치한 개별 함수와 클래스로 내려가면서 작업한다는 뜻
    • 공통 모듈이 많이 있다면 이런 접근 방법이 유용할 수 있다.
      • 최상위 함수가 def 대신 async def를 사용하게 변경하라.
      • 최상위 함수가 I/O를 호출하는 모든 부분(이벤트 루프가 블록될 가능성이 있다.)을 asyncio.run_in_executor로 감싸라.
      • run_in_executor 호출이 사용하는 자원이나 콜백이 제대로 동기화(예: lock이나, asyncio.run_coroutine_threadsafe 함수를 사용) 했는지를 확인하라.
      • 호출 계층의 잎 쪽으로 내려가면서, 중간에 있는 함수와 메서드를 코루틴으로 변환하며 → get_event_loop와 run_in_executor 호출을 없애려고 시도하라. (이때 변환할 내부 코드에 따라 1~3 단계를 다시 따른다.)
  • 상향식(down-top)

    • 프로그램에서 잎 부분에 있는, 포팅하려는 함수의 비동기 코루틴 버전을 새로 만들라.
    • 기존 동기 함수를 변경해서 → 코루틴 버전을 호출하고 → 실제 동작을 구현하는 대신 이벤트루프를 실행하게 하라.
    • 호출 계층을 한 단계 올려서 다른 코루틴 계층을 만들고, 기존에 동기적 함수를 호출하던 부분을 1단계에서 정의한 코루틴 호출로 바꿔라.
    • 이제 비동기 부분을 결합하기 위해 2단계에서 만든 동기적인 래퍼가 더 이상 필요하지 않다. 이를 삭제하라.
profile
새로운 것이 들어오면 이미 있는 것과 충돌을 시도하라.

0개의 댓글