asyncio 모듈 (1): 기초

About_work·2022년 12월 14일
0

process, thread

목록 보기
3/23

다루는 내용

  • asyncio 언제 써야해? 장점?
  • 기초 예시, 기초 예시 코드
  • 기초 개념
  • awatiable: coroutine, Future , Task
  • 다중 작업을 동시에 실행시키는 방법?

언제 써야해? 장점?

  • 한줄요약: 1개의 주방 / 1개의 chef / 10개의 해야할 요리
  • 코루틴은 스레드와 달리 아래의 것들이 필요 없다.
    • 메모리 부가 비용
    • 스레드 시작 비용
    • 컨텍스트 전환 비용
    • 복잡한 lock(동기화 코드) 이용
  • 시스템 콜(블록킹 I/O와 스레드 시작도 포함해)을 코루틴으로 만들면, 프로그램의 응답성이 좋아지고 사용자가 느끼는 지연 시간을 줄일 수 있다.
    • 시스템 콜
      • 운영 체제의 커널이 제공하는 서비스에 대해, 응용 프로그램의 요청에 따라 커널에 접근하기 위한 인터페이스이다.
      • 언어로 작성된 프로그램들은 직접 시스템 호출을 사용할 수 없기 때문에, 고급 API를 통해 시스템 호출에 접근하게 하는 방법이다.
  • 작업의 진행 상황을 반복적으로 측정하는 event loop 가 존재하여, 각 작업들을 효율적으로 관리해주므로 → I/O 대기 시간이 효율적으로 관리됩니다.
    • 다른 멀티 스레딩 패키지들은 "non-blocking APIs" 라는 점에서 비동기이지만, 여전히 thread/process pools가 알아서 하는 것에 의존하고 있다.
    • request 요청 시점과 - 결과 반환 시점을 알 수 있다면 → asyncio를 써서 보다 효율적으로 다른 작업으로 전환할 수 있다.
  • (확인 필요)코루틴이 여러개 수행되더라도, 단일 스레드에서 수행하기 때문에 lock을 사용할 필요가 없다. (I/O 는 asyncio가 제공하는 이벤트 루프의 일부분으로 병렬화된다.)
  • blocking call이 전혀 없다. asyncio.run() entry point가 유일한 blocking part이다.
  • asyncio는 multiple servers와 연결된 servers와 clients 의 선택 도구로 사용되야 한다.
import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))
    return

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

기초 예시

코루틴?

  • 코루틴은 비동기 프로그래밍을 위해 사용되는 함수

  • 실행을 중지하고 재개할 수 있음

  • async와 await 키워드를 사용하여 정의되며, I/O 바운드 작업을 효율적으로 처리하기 위해 사용

  • 코루틴은 비동기 작업을 순차적으로 작성할 수 있게 해주어, 복잡한 비동기 코드를 더 쉽게 관리할 수 있게 해줍니다.

  • coroutine

    • 실행을 일시적으로 중단하고 다시 시작할 수 있도록 함
    • 코루틴은 동시성은 제공하지만, 병렬성은 제공하지 않습니다.
    • 외부에서 제어할 수 없는 비선점 멀티태스킹을 지원
      • 비선점 멀티태스킹: CPU를 차지하고 있는 스레드가, 자신이 이제 CPU 연산이 필요 없음을 나타냈을 때에만 OS가 이를 회수할 수 있는 경우
      • 선점형 멀티태스킹: 특정 스레드가 CPU를 차지해서 사용하더라도, OS가 타이머나 여타 트리거를 통해 개입하여 강제로 CPU 사용을 빼았아 올 수 있는 경우
async def abc():
    코드
ABC = abc() # 실행 안됨
await ABC # 실행됨
  • abc 함수가 코루틴이 된다.
  • 코루틴은 비동기로 실행된다.
  • ABC = abc()
    • 코드가 실행되지 않고, 코루틴 객체(인스턴스)를 리턴할 뿐이다.
    • "await 코루틴 인스턴스를 해야만"!!
      • 코루틴 실행
    • 팬 아웃 수행

기초 예시 코드

  • asyncio는 coroutines를 이용하여 쓰인다.
  • coroutines는 await를 이용하여 흥미로운 일이 일어날 때까지 parallel하게 기다린다(suspend).
  • suspending은 blocking이랑 다르다. suspend는 event loop thread를 허용한다. → 다른 것들을 계속하기 위해
  • (확인 필요)코루틴 안에서 다른 코루틴을 실행할 때는 await을 이용합니다.
import asyncio
 
async def add(a, b):
    print('add: {0} + {1}'.format(a, b))
    await asyncio.sleep(1.0)    # 1초 대기. asyncio.sleep도 네이티브 코루틴
    return a + b    # 두 수를 더한 결과 반환
 
async def print_add(a, b):
    result = await add(a, b)    # await로 다른 네이티브 코루틴 실행하고 반환값을 변수에 저장
    print('print_add: {0} + {1} = {2}'.format(a, b, result))
 
loop = asyncio.get_event_loop()             # 이벤트 루프를 얻음
loop.run_until_complete(print_add(1, 2))    # print_add가 끝날 때까지 이벤트 루프를 실행
loop.close()                                # 이벤트 루프를 닫음
  • loop = asyncio.get_event_loop()
    • 현재의 이벤트 루프를 기져옵니다.
    • 코루틴 또는 콜백에서 호출될 때, 이 함수는 항상 실행 중인 이벤트 루프를 반환합니다.
    • 이 함수는 다소 복잡한 동작을 하므로, coroutine과 callback에서 get_event_loop 보다 get_running_loop 함수를 사용하는 것이 좋습니다.
  • loop.run_until_complete(future)
    • future 는 코루틴 객체 혹은 future 객체
    • future(Future의 인스턴스)가 완료할 때까지 이벤트 루프를 실행합니다.
    • 코루틴이 이벤트루프에서 실행되도록 예약하고, 해당 코루틴이 끝날 때까지 기다립니다.
    • 인자가 코루틴 객체면, asyncio.Task로 실행되도록 묵시적으로 예약 됩니다.
    • 퓨처의 결과를 반환하거나 퓨처의 예외를 일으킵니다.
  • loop.close()
    • 이벤트 루프를 닫습니다.
    • 이 함수를 호출할 때 루프는 반드시 실행 중이지 않아야 합니다. 계류 중인 모든 콜백을 버립니다.
    • 이 메서드는 모든 큐를 비우고 실행기를 종료하지만, 실행기가 완료할 때까지 기다리지 않습니다.
    • 이 메서드는 되돌릴 수 없습니다. 이벤트 루프가 닫힌 후에 다른 메서드를 호출해서는 안 됩니다.

기초 개념

  • single process / single thread
  • 활성화된 코루틴은 종료될 때까지 1KB 미만의 메모리를 사용한다.
  • 스레드와 마찬가지로, 코루틴도 환경으로부터 입력을 소비하고 결과를 출력할 수 있는 독립적인 함수다.
  • 코루틴은 매 await 식에서 일시 중단되고, 일시 중단된 대기 가능성이 해결된 다음에, async 함수로부터 실행을 재개한다는 차이점이 있다.
  • 외부 환경에 대한 명령(예: I/O)와, 원하는 명령을 수행하는 방법을 구현하는 것(예: 이벤트 루프)를 코드에서 분리해준다.
  • asyncio 내장 모듈을 사용하면, thread와 블로킹 I/O를 사용하는 기존 코드를 → 코루틴과 비동기 I/O를 사용하는 코드로 쉽게 포팅할 수 있다.

Awaitable: coroutine , Future , Task

  • awaitable
    • await을 할 수 있는 객체
    • coroutines, Tasks, Futures가 해당
  • coroutines
    • 코루틴 인스턴스를 await 호출했을 때, 비로소 수행된다.
    • 코루틴은 동시성은 제공하지만, 병렬성은 제공하지 않습니다.
  • asyncio.Future
    • 미래에 할 일을 표현하는 클래스인데 할 일을 취소하거나 상태 확인, 완료 및 결과 설정에 사용합니다.
    • Future는 coroutine을 예외 처리들을 위해 감싼 것
  • asyncio.Task
    • Future을 상속해서 만든 클래스
    • asyncio.Future의 기능 + 실행할 코루틴의 객체를 포함하고 있습니다.
    • 수행할 코루틴을 이벤트 루프에 전달합니다.
    • 코루틴이 future을 기다리면, task는 코루틴의 수행을 중단하고 future과 완료되기를 기다립니다.
    • future이 완료되면 코루틴의 수행이 재개됩니다.
    • 태스크는 코루틴의 실행을 취소하거나 상태 확인, 완료 및 결과 설정에 사용합니다.
    • coroutine과 차이점
      • 태스크를 만든 시점에 코루틴을 수행하기 시작합니다.
      • (논란) 태스크는 병렬성도 제공하나?
    • Future와의 차이점: Task는 Futures에 event loop와 같이 연계한 것이라고 보면 된다.

future

async def set_after(fut, delay, value):
    # Sleep for *delay* seconds.
    await asyncio.sleep(delay)

    # Set *value* as a result of *fut* Future.
    fut.set_result(value)

async def main():
    # Get the current event loop.
    loop = asyncio.get_running_loop()

    # Create a new Future object.
    fut = loop.create_future()

    # Run "set_after()" coroutine in a parallel Task.
    # We are using the low-level "loop.create_task()" API here because
    # we already have a reference to the event loop at hand.
    # Otherwise we could have just used "asyncio.create_task()".
    loop.create_task(
        set_after(fut, 1, '... world'))

    print('hello ...')

    # Wait until *fut* has a result (1 second) and print it.
    print(await fut)

asyncio.run(main())
  • asyncio.run(coro, debug=None)

    • 코루틴 coro을 이벤트 루프상에서 실행하고, 각 함수가 의존하는 I/O를 수행한다.
    • 코루틴 coro를 실행하고 결과를 반환합니다.
    • 이 함수는 전달된 코루틴을 실행하여 → 비동기 이벤트 루프를 관리하고 + 비동기 generators를 최종화하고 + threadpool을 닫습니다.
    • 다른 비동기 이벤트 루프가 동일한 thread에서 실행 중일 때는, 이 함수를 호출할 수 없습니다.
    • debug
      • True 이면
        • 이벤트 루프가 debug 모드에서 실행됩니다.
        • 이벤트 루프가 빨리 반응하지 못하게 방해하는 코루틴을 식별할 수 있다.
      • False는 디버그 모드를 명시적으로 비활성화 합니다.
      • None이면 global debug mode setting을 따르겠다는 뜻
    • 이 함수는 항상 새 이벤트 루프를 생성하고 마지막에 닫습니다.
    • 비동기 프로그램의 main entry point로 사용되어야 하며, 이상적으로는 한번만 호출되어야 합니다.
  • loop.create_future()

    • 이벤트 루프에 연결된 asyncio.Future 객체를 만듭니다.
    • 이것이 asyncio에서 퓨처를 만드는 데 선호되는 방법입니다.

future = asyncio.Future(*, loop=None)

  • Future는 비동기 연산의 최종 결과를 나타냅니다.
  • 스레드 안전하지 않습니다.
  • awaitable object 이다.
  • 코루틴은 그들이 "result나 예외를 내거나, 혹은 취소가 될 때"까지 Future objects를 기다릴 수 있습니다.
  • 간단한 규칙은 사용자가 만나는 API에서 Future 객체를 절대 노출하지 않는 것이며, Future 객체를 만드는 권장 방법은 loop.create_future()를 호출하는 것입니다.

future.result()

  • Future의 결과를 반환합니다.
  • Future가 완료(done)했고 set_result() 메서드로 결과가 설정되었으면, 결괏값이 반환됩니다.
  • Future가 완료(done)했고 set_exception() 메서드로 예외가 설정되었으면, 이 메서드는 예외를 발생시킵니다.
  • Future가 취소(cancelled)되었으면, 이 메서드는 CancelledError 예외를 발생시킵니다.
  • Future의 결과를 아직 사용할 수 없으면, 이 메서드는 InvalidStateError 예외를 발생시킵니다.

future.set_result(result)

  • Future를 완료(done)로 표시하고, 그 결과를 설정합니다.
  • Future가 이미 완료(done)했으면, InvalidStateError 에러를 발생시킵니다.

future.set_exception(exception)

  • Future를 완료(done)로 표시하고, 예외를 설정합니다.
  • Future가 이미 완료(done)했으면, InvalidStateError 에러를 발생시킵니다.

future.done()

  • Future가 완료(done)했으면 True를 반환합니다.
  • Future는 취소(cancelled)되었거나 set_result() 나 set_exception() 호출로 결과나 예외가 설정되면 완료(done)됩니다.

future.cancelled()

  • Future가 최소(cancelled)되었으면, True를 반환합니다.
  • 이 메서드는 대개 결과나 예외를 설정하기 전에 Future가 취소(cancelled)되었는지 확인하는 데 사용됩니다:
if not fut.cancelled():
    fut.set_result(42)

future.add_done_callback(callback, *, context=None)

  • Future가 완료(done)될 때 실행할 콜백을 추가합니다.
  • callback는 유일한 인자인 Future 객체로 호출됩니다.
# Call 'print("Future:", fut)' when "fut" is done.
fut.add_done_callback(
    functools.partial(print, "Future:"))

future.cancel(msg=None)

  • Future를 취소하고 콜백을 예약합니다.
  • Future가 이미 완료(done)했거나 취소(cancelled)되었으면, False를 반환합니다.
  • 그렇지 않으면 Future의 상태를 취소(cancelled)로 변경하고, 콜백을 예약한 다음 True를 반환합니다.

future.exception()

  • 이 Future에 설정된 예외를 반환합니다.
  • Future가 완료(done)했을 때만 예외(또는 예외가 설정되지 않았으면 None)가 반환됩니다.
  • Future가 취소(cancelled)되었으면, 이 메서드는 CancelledError 예외를 발생시킵니다.
  • Future가 아직 완료(done)하지 않았으면, 이 메서드는 InvalidStateError 예외를 발생시킵니다.

future.get_loop()

  • Future 객체가 연결된 이벤트 루프를 반환합니다.

Task

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now

task = asyncio.Task(coro, *, loop=None, name=None)

  • 파이썬 코루틴을 실행하는 퓨처류 객체입니다. 스레드 안전하지 않습니다.
  • 태스크는 이벤트 루프에서 코루틴을 실행하는 데 사용됩니다.
  • 만약 코루틴이 Future를 기다리고 있다면, 태스크는 코루틴의 실행을 일시 중지하고 Future의 완료를 기다립니다.
  • 퓨처가 완료되면, 감싸진 코루틴의 실행이 다시 시작됩니다.
  • 이벤트 루프는 협업 스케줄링을 사용합니다:
    • 이벤트 루프는 한 번에 하나의 Task를 실행합니다.
    • Task가 Future의 완료를 기다리는 동안, 이벤트 루프는 다른 태스크, 콜백을 실행하거나 IO 연산을 수행합니다.
  • 테스크를 만들려면 고수준 asyncio.create_task() 함수를 사용하거나, 저수준 loop.create_task() 나 ensure_future() 함수를 사용하십시오.
  • 태스크의 인스턴스를 직접 만드는 것은 권장되지 않습니다.
  • 실행 중인 Task를 취소하려면 cancel() 메서드를 사용하십시오.
  • 이를 호출하면 태스크가 감싼 코루틴으로 CancelledError 예외를 던집니다.
  • 코루틴이 취소 중에 Future 객체를 기다리고 있으면, Future 객체가 취소됩니다.
  • cancelled()는 태스크가 취소되었는지 확인하는 데 사용할 수 있습니다.
  • 이 메서드는 감싼 코루틴이 CancelledError 예외를 억제하지 않고 실제로 취소되었으면 True를 반환합니다.
  • asyncio.Task는 Future.set_result()와 Future.set_exception()을 제외한 모든 API를 Future에서 상속받습니다.

task = asyncio.create_task(coro, *, name=None, context=None)

  • coro 코루틴을 Task로 감싸고 실행을 예약합니다(스케줄링 합니다). Task 객체를 반환합니다.
  • 큐에 넣은 후 이벤트 루프에서 실행한다.
  • 팬 아웃 접근 방법
  • name이 None이 아니면, Task.set_name()을 사용하여 태스크의 이름으로 설정됩니다.
  • get_running_loop()에 의해 반환된 루프에서 태스크가 실행되고, 현재 스레드에 실행 중인 루프가 없으면 RuntimeError가 발생합니다.

다중 작업을 동시에 실행시키는 방법?

async def simulate(grid):
    next_grid = Grid(grid.height, grid.width)

    coroutine_instances = []
    for y in range(grid.height):
        for x in range(grid.width):
            task = step_cell(
                y, x, grid.get, next_grid.set)  # 팬아웃
            coroutine_instances.append(task)

    await asyncio.gather(*coroutine_instances)  # 팬인

    return next_grid

grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    # python 3.7이상에서만 asyncio.run을 제공함
    grid = asyncio.run(simulate(grid)) # 이벤트 루프를 실행한다

print(columns)
  • awaitable asyncio.gather(*aws, return_exceptions=False)
    • aws 시퀀스에 있는 어웨이터블 객체를 동시에 실행합니다.
    • aws에 있는 어웨이터블이 코루틴이면 자동으로 태스크로 예약됩니다.
    • 모든 어웨이터블이 성공적으로 완료되면, 결과는 반환된 값들이 합쳐진 리스트입니다.
    • await을 앞에 붙이면, 성공될 때까지 기다립니다. fan-in / join의 역할을 합니다.
    • 결괏값의 순서는 aws에 있는 어웨이터블의 순서와 일치합니다.
    • return_exceptions가 False(기본값)면, 첫 번째 발생한 예외가 gather()를 기다리는 태스크로 즉시 전파됩니다.
    • aws 시퀀스의 다른 어웨이터블은 취소되지 않고 계속 실행됩니다.
    • return_exceptions가 True면, 예외는 성공적인 결과처럼 처리되고, 결과 리스트에 집계됩니다.
    • gather()가 취소되면, 모든 제출된 (아직 완료되지 않은) 어웨이터블도 취소됩니다.
    • aws 시퀀스의 Task나 Future가 취소되면, 그것이 CancelledError를 일으킨 것처럼 처리됩니다 – 이때 gather() 호출은 취소되지 않습니다.
      • 이것은 제출된 태스크/퓨처 하나를 취소하는 것이 다른 태스크/퓨처를 취소하게 되는 것을 막기 위한 것입니다.
profile
새로운 것이 들어오면 이미 있는 것과 충돌을 시도하라.

0개의 댓글