스레드를 사용한 I/O를 어떻게 asyncio로 포팅할 수 있는지 알아두라

매일 공부(ML)·2022년 9월 26일
0

이어드림

목록 보기
137/146

스레드를 사용한 I/O를 어떻게 asyncio로 포팅할 수 있는지 알아두라

스레드와 블로킹 I/O를 사용하는 코드를 코루틴과 비동기 I/O를 사용하는 코드로 옮기기 쉽다.
#서버가 보내거나 받는 메시지 한 줄 한 줄은 처리할 명령을 표현한다.
class EOFError(Exception):
    pass

class ConnectionBase:
    def __init__(self, connection):
        self.connection =  connection
        self.file = connetion.makefile('rb')

    def send(self, command):
        line  = command + '\n'
        data = line.encode()
        self.connection.send(data)

    def receive(self):
        line =  self.file.readline()
        if not line:
            
            raise EOFError('Connection closed')
        return line[:-1].decode()
#한 번에 하나씩 연결하여 클라이언트의 세션 상태 유지 클래스 구현

import random 

WARMER = '더 따뜻함'
COLOER = '더 차가움'
UNSURE = '잘 모르겠음'
CORRECT = '맞음'

class UnKnownCommandError(Exception):
    pass

class Session(ConnectionBase):
    def __init__(self, *args):
        super().__init__(*args)
        self._clear_state(None, None)

    def _clear_state(self, lower, upper):
        self.lower = lower
        self.upper = upper
        self.secret = None
        self.guesses = []
#이 메서드는 클라이언트에서 들어오는 메시지를 처리해 명령에 맞는 메서드 호출

def loop(self):
    while command:=self.receive():
        parts = command.split(' ')
        if parts[0] == 'PARAMS':
            self.set_params(parts)
        elif parts[0] == 'NUMBER':
            self.send_number()
        elif parts[0] == 'REPORT':
            self.receive_report(parts)
        else:
            raise UnKnownCommandError(command)
#서버가 추측할 값의 상하한 구하기(fIRST)
def set_params(self, parts):
    assert len(parts[1])
    lower = int(parts[1])
    upper = int(parts[2])
    self._clear_state(lower, upper)
#클라이언트에 해당하는 Session 인스턴스에 저장된 이전 상태를 바탕으로 새로운 수 추측(second)


def next_guess(self):
    if self.secret is not None:
        return self.secret

    while True:
        guess = random.randint(self.lower, self.upper)
        if guress not in self.guesses:
            return guess


def send_number(self):
    guess = self.next_guess()
    self.guess.append(guess)
    self.send(format(guess))
#서버의 추측이 따듯한지 차가운지에 대해서 클라이언트가 보낸 결과를 받은 후 session상태
def receive_report(self, parts):
    assert len(parts) == 2
    decision = parts[1]

    last = self.guesses[-1]
    if decision == CORRECT:
        self.secret = last

    print(f'서버: {last}는 {decision}')
#클라이언트도 상태가 있는 클래스를 사용해서 구현

import contextlib
import math

class Client(ConnectionBase):
    def __init__(self, *args):
        super().__init__(*args)
        self._clear_state()

    def _clear_state(self):
        self.secret = None
        self.last_distance = None
이제 이걸 사용하려면 async, await, asyncio 내장 모듈을 사용해서 변환

#ConnectionBase 클래스가 블로킹 I/O 대신 send와 receive라는 코루틴 제공

class AsyncConnectionBase:
    def __init__(self, reader, writer): #변경됨
        self.reader = reader #변경됨
        self.writer = writer

    async def send(self, command):
        line = command + '\n'
        data = line.encode()
        self.writer.wrtie(data) #변경됨
        await self.writer.drain() #변경됨

    async def receive(self):
        line = await self.reader.readline()
        if not line:
            raise EOFError('연결 닫힘')

        return line[:-1].decode()

#단일 연결의 세션 상태를 표현하기 위해 상태를 담는 클래스를 추가해여한다.
#위와 같은 방식으로 변경을 하여 상속을 진행한다.

class AsyncSession(AsyncConnectionBase):
    def __init__(self, *args):
        ...

    def _clear_values(self, lower, upper):
        ...
꼭 필요한 부분만 바꿔서 코루틴이 되게 한다.

async def loop(self): #변경됨
    while command := await self.receive(): #변경됨
        parts = command.split(' ')
        if parts[0] == 'PARAMS':
            self.set_params(parts)
        elif parts[0] == 'NUMBER':
            await self.send_number() #변경됨
        elif parts[0] == 'REPORT':
            self.receive_report(parts)
        else:
            raise UnKnownCommandError(command)
#first 코드는 바꿀게 없다
#두번째 명령을 처리하는 메서드는 바꿀 부분은 추측한 값을 클라이언트에게 송시하여 비공기 I/O쓴다.
def next_guess(self):
    ...
async def send_number(self): #변경됨
    guess = self.next_guess()
    self.guesses.append(guess)
    await self.send(format(guess)) #변경됨
#세번ㅉ재 명령을 처리하는데 바꿀 메서드 없다.
#클라이언트 클래스도 AsyncConnectionBase를 상속

class AsyncClient(AsyncConnectionBase):
    def __init__(self, *args):
        ...
    def _clear_state(self):
        ...

    #클라이언트의 첫 번재 명령을 처리하는 메서드
    #몇 부분은 async와 await 키워드를 추가
    #contextlib 내장 모듈에서 async contextmanager 도우미 함수를 가져와 사용

    @contextlib.asynccontextmanager
    async def seesion(self, lower, upper, secret): #변경됨
        print(f'\n{lower}와 {upper} 사이의 수자를 맞춰보세요!'
              f' 쉿! 그 숫ㅈ는 {secret}입니다.')
        self.secret = secret
        await self.send(f'PARAMS {lower} {upper}')#변경됨
        try:
            yield
        finally:
            self._clear_state()
            await self.send('PARAMS 0 -1')
#두 번째 명령에서도 코루틴 동작이 필요한 부분에 ASYNC와 AWAIT 추가

async def request_numbers(self, count): #변경됨
    for _ in range(count):
        await self.send('NUMBER')
        data = await self.receive()
        yield int(data)
        if self.last_distance == 0:
            return
#세 번째 명령에서는 async와 await를 각각 하나씩만 추가
async def report_outcome(self, number): #변경됨
    await self.send(f'REPORT {decision}') #변경됨
#서버를 실행하는 코드는 asyncio 내장 모듈과 그 모듈 내에 start_server함수 사용
import asyncio

async def handle_async_connection(reader, writer):
    seesion = AsyncSession(reader, writer)
    try:
        await session.loop()
    except EOFError:
        pass


async def run_async_server(address):
    server = await asynico.start_server(
        handle_async_connection, *address)
    async with server:
        await server.serve_forever()
#함수에서 코루틴과 상호작용하는 다른 부분에는 async와 await 키워드를 적절히 추가
# 추가하지 않으면 실행 시점에 예외가 발생한다.

async def run_async_client(address):
    streams = await asyncio.open_connection(*addres) #새 기능 
    client = AsyncClient(*streams) #새 기능

    async with client.session(1,5,3):
        results = [(x, await client.report_outcome(x))
                    async for x in client.request_number(5)]
        
        async with client.seesion(10,15,12):
            async for number in client.request_numbers(5):
                outcome = await client.report_outcome(number)
                results.append((number, outcome))

        _, writer = streams #새 기능
        writer.close() #새 기능
        await writer.wait_closed() #새 기능

        return results
"""
다음 코드는 asyncio.create_ta sk 를 해 서버를 에 
넣은 후 이벤트 루프에서 실행한다. 그 후 코드가 await 에 도달하면 클라이 
트 와 서버가 병렬로 행된다. 이 방식은 asy ncio.gather 의 동작 
는 른 팬아웃- 접근 방법이다.
"""

async def main_async():
    address = ('127.0.0.1', 4321)

    server = run_async_server(address)
    asyncio.create_task(server)

    results = await run_async_client(address)
    for number, outcome in results:
        print(f'클라이언트: {number}는 {outcome}')
        

Symmary

  • 파이썬은 for 루프, with 문, 제너레이터, 컴프리헨션의 비동기 버전을 제공하고, 코루틴 안에서 기존 라이브러리 도우미 함수를 대신해 즉시 사용할 수 있는 대안 제공

  • asyncio 내장 모듈을 사용하면 스레드와 블로킹 I/O를 사용하는 기존 코드를 코루틴과 비동기 I/P를 사용하는 코드로 쉽게 포팅

profile
성장을 도울 아카이빙 블로그

0개의 댓글