import websockets
import asyncio
import serial
import cv2
import numpy
import base64
async def serial_task(websocket):
while 1 :
ardRes1 = box1.readline().decode('utf-8').strip()
print('from ardu',ardRes1)
if '1 close' in ardRes1:
print('websockets sent', ardRes1)
websocket.send(ardRes1)
async def websocket_task(websocket):
await websocket.send((str(len(stringData))).encode().ljust(16) + stringData)
while 1:
res = websocket.recv()
print('wait')
ardRes2 = await box1.readline().decode('utf-8')
if res == '1 open':
box1.write('1 open'.encode())
print("box 1 open")
ret2, frame2 = capture2.read()
result, img_encode1 = cv2.imencode('.jpg', frame2, encode_param)
data = numpy.array(img_encode1)
stringData = data.tostring()
#print (stringData)
await websocket.send(str(stringData))
async def main():
async with websockets.connect("wss://www.share42-together.com:8088/ws/locker", max_size=2048576) as websocket:
task1 = asyncio.create_task(serial_task(websocket))
task2 = asyncio.create_task(websocket_task(websocket))
asyncio.gather(task1, task2)
asyncio.run(main())
문제점
gather
메서드를 사용했기 때문에 두 함수가 동시에 실행 될 것이라 기대했지만 serial_task
의 await
에서 멈추고 이후로 실행이 안됨
await를 무분별하게 사용하면 안됨
두 번째 해결방안
task
를 각각 큐로 만들어 준다.asyncio
는 자체적인 queue
를 제공한다.코드
import websockets
import asyncio
import serial
import cv2
import json
import numpy
import base64
from asyncio import Queue
SERVER_URL = "www.share42-together.com"
PROTOCOL = "wss" # ws / wss
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 90]
async def serial_task(serial_queue: Queue, websocket_queue: Queue):
box1 = serial.Serial(port = "/dev/ttyACM0",
baudrate = 9600,
bytesize = serial.EIGHTBITS,
parity = serial.PARITY_NONE,
timeout =1)
while 1:
ardRes1 = box1.readline().decode('utf-8').strip()
if ardRes1:
split_res1 = ardRes1.split()
object1 = json.dumps({
'number' : int(split_res1[0]),
'command' : split_res1[1],
'weight' : int(split_res1[2]),
'cam' : None,
})
print(object1)
await websocket_queue.put(object1)
else:
await websocket_queue.put(ardRes1)
await asyncio.sleep(0.1)
if not serial_queue.empty():
value_to_send = await serial_queue.get()
if value_to_send == '1 open':
box1.write(value_to_send.encode())
elif value_to_send == '2 open':
box1.write(value_to_send.encode())
async def websocket_task(url: str, websocket_queue: Queue, serial_queue: Queue):
async with websockets.connect(url, max_size=2048576) as websocket:
asyncio.create_task(consume_queue(websocket, websocket_queue))
while 1:
res = await websocket.recv()
print('from websocket', res)
if res == '1 open':
await serial_queue.put(res)
elif res == '1 cam':
capture1 = cv2.VideoCapture(0)
ret1, frame1 = capture1.read()
if not capture1.isOpened():
print('cam1 connect fail')
exit()
result1, img_encode1 = cv2.imencode('.jpg', frame1, encode_param)
data1 = numpy.array(img_encode1)
stringData1 = base64.b64encode(data1)
cam_data1 = json.dumps({
'number' : 1,
'command' : "cam",
'weight' : 0,
'cam' : stringData1.decode('utf8'),
})
capture1.release()
await websocket_queue.put(cam_data1)
elif res == '2 open':
await serial_queue.put(res)
elif res == '2 cam':
capture2 = cv2.VideoCapture(1)
ret2, frame2 = capture2.read()
if not capture2.isOpened():
print('cam2 connect fail')
exit()
result2, img_encode2 = cv2.imencode('.jpg', frame2, encode_param)
data2 = numpy.array(img_encode2)
stringData2 = base64.b64encode(data2)
cam_data2 = json.dumps({
'number' : 2,
'command' : "cam",
'weight' : 0,
'cam' : stringData2.decode('utf8'),
})
capture1.release()
await websocket_queue.put(cam_data2)
async def consume_queue(websocket, queue:Queue):
while 1:
line = await queue.get()
if line:
await websocket.send(line)
async def main():
websocket_queue = Queue()
serial_queue = Queue()
url = "wss://www.share42-together.com:8088/ws/locker"
task1 = asyncio.create_task(serial_task(serial_queue, websocket_queue))
task2 = asyncio.create_task(websocket_task(url, websocket_queue, serial_queue))
await asyncio.gather(task1, task2)
asyncio.run(main())
queue
를 활용해 값이 있는지 없는지를 통해 쓸모 없는 기다림을 없애주고 멀티쓰레딩을 구현할 수 있었다.