asyncio.run(run_tasks_mixed(handles, 0.1, output_path))
confirm_merge(input_paths, output_path)
tmpdir.cleanup()
async def run_tasks_mixed(handles, interval, output_path):
# get_event_loop: 현재의 이벤트 루프를 가져옵니다.
loop = asyncio.get_event_loop()
with open(output_path, 'wb') as output:
async def write_async(data):
output.write(data)
def write(data):
coro = write_async(data)
future = asyncio.run_coroutine_threadsafe(
coro, loop)
future.result()
futures = []
for handle in handles:
future = loop.run_in_executor(
None, tail_file, handle, interval, write)
futures.append(future)
# asyncio.gather 과 await을 이용하여, tail_file 이 모두 종료되도록 팬인 시킨다.
await asyncio.gather(*futures)
def tail_file(handle, interval, write_func):
"""
Objectives
- 새로운 줄이 들어오면, 주어진 콜백 함수로 그 줄을 전달해서 -> 출력 로그에 데이터를 기록한다.
- 데이터가 없는 경우 -> 데이터를 폴링하며 대기하느라 CPU를 낭비하는 시간을 줄이기 위해 thread를 일시 정지 상태로 만든다.
- 폴링
- 하나의 장치(또는 프로그램)가 충돌 회피 또는 동기화 처리 등을 목적으로 다른 장치(또는 프로그램)의 상태를 주기적으로 검사하여 일정한 조건을 만족할 때 송수신 등의 자료처리를 하는 방식을 말한다
"""
while not handle.closed:
try:
line = readline(handle)
except NoNewData:
time.sleep(interval)
else:
write_func(line)
asyncio.run(run_fully_async(handles, 0.1, output_path))
confirm_merge(input_paths, output_path)
tmpdir.cleanup()
async def run_fully_async(handles, interval, output_path):
async with WriteThread(output_path) as output:
tasks = []
for handle in handles:
coro = tail_async(handle, interval, output.write)
task = asyncio.create_task(coro)
tasks.append(task)
await asyncio.gather(*tasks)
class WriteThread(Thread):
def __init__(self, output_path):
super().__init__()
self.output_path = output_path
self.output = None
self.loop = asyncio.new_event_loop()
def run(self):
asyncio.set_event_loop(self.loop)
with open(self.output_path, 'wb') as self.output:
self.loop.run_forever()
# 맨 마지막에 한번 더 이벤트 루프를 실행해서
# 다른 이벤트 루프가 stop()에 await하는 경우를 해결한다.
self.loop.run_until_complete(asyncio.sleep(0))
async def real_write(self, data):
self.output.write(data)
async def write(self, data):
coro = self.real_write(data)
future = asyncio.run_coroutine_threadsafe(
coro, self.loop)
await asyncio.wrap_future(future)
###
async def tail_async(handle, interval, write_func):
loop = asyncio.get_event_loop()
while not handle.closed:
try:
line = await loop.run_in_executor(
None, readline, handle)
except NoNewData:
await asyncio.sleep(interval)
else:
await write_func(line)
기본
하향식(top-down)
상향식(down-top)