[PJT] RPI-리모콘 연결

정재훈·2022년 5월 24일
0

간단 PJT

목록 보기
6/6

DB 테스트

RC Car에서 작업 디렉토리 이동 후 vi ./app.py후 해당 코드를 테스트 해보자!

import mysql.connector

db = mysql.connector.connect(host='AWS IP 주소`, user=`관리자 계정 ID', password='관리자 PW', database='생성한 스키마명', auth_plugin='mysql_native_password')
cur = db.cursor()

#query
cur.execute("select * from command")

#print
for (id, time, cmd_string, arg_string, is_finish) in cur:
    print(id, time, cmd_string, arg_string, is_finish)

cur.close()
db.close()

Timer 사용하기

임베디드에서 Timer는 Thread가 아니라 Interrupt로 동작됩니다. 이와 달리, Python 에서 Timer는 Threading으로 동작되어, Main Thread와 Timer Thread가 동시에 동작 됩니다.

import mysql.connector
from threading import Timer
from time import sleep

def polling():
    print("HI")

    timer = Timer(2, polling)
    timer.start()

polling()

while True: pass

명령어 미리 받기 위한 Timer

다음 코드를 응용하여

Polling Thread : Command Query 로 받은 데이터를 “ready” 객체에 저장한다.
Main Thread : “ready” 객체에 저장된 명령을 수행한다. (Motor 제어)

와 같이 구현할 예정입니다.

import mysql.connector
from threading import Timer
from time import sleep
import signal
import sys

def closeDB(signal, frame):
    print("BYE")
    timer.cancel()
    sys.exit(0)

def polling():
    print("HI")   
    global timer
    timer = Timer(2, polling)
    timer.start()

timer = None
signal.signal(signal.SIGINT, closeDB)
polling()

while True: 
    print("MAIN")
    sleep(0.5)
    pass

지속적으로 Query 받기

import mysql.connector
from threading import Timer
from time import sleep
import signal
import sys

def closeDB(signal, frame):
   print("BYE")
   cur.close()
   db.close()
   timer.cancel()
   sys.exit(0)

def polling():
   global cur, db
   
   # 가장 최근의 레코드를 지속적으로 가져오는 소스코드
   cur.execute("select * from command order by time desc limit 1") 
   for (id, time, cmd_string, arg_string, is_finish) in cur:
       print(cmd_string, arg_string)

   db.commit()
    
   global timer
   timer = Timer(1, polling)
   timer.start()

#init
db = mysql.connector.connect(host='AWS IP 주소`, user=`관리자 계정 ID', password='관리자 PW', database='생성한 스키마명', auth_plugin='mysql_native_password')
cur = db.cursor()
timer = None
signal.signal(signal.SIGINT, closeDB)
polling()

#main thread
while True:
   print("KFC MAIN")
   sleep(0.5)
   pass

코드

import mysql.connector
from threading import Timer
from time import sleep
import signal
import sys

def closeDB(signal, frame): # 신호시 종료
    print("BYE")
    cur.close()
    db.close()
    timer.cancel()
    sys.exit(0)

def polling(): # 최근 명령어 가져오기
    global cur, db, ready
    
    cur.execute("select * from command order by time desc limit 1")
    for (id, time, cmd_string, arg_string, is_finish) in cur:
        if is_finish == 1 : break
        ready = (cmd_string, arg_string)
        cur.execute("update command set is_finish=1 where is_finish=0") # is_finish 1로 셋팅

    db.commit()
     
    global timer
    timer = Timer(0.1, polling)
    timer.start()

def go():
    print("go")

def back():
    print("back")

def stop():
    print("stop")

def left():
    print("left")

def mid():
    print("mid")

def right():
    print("right")

#init
db = mysql.connector.connect(host='13.125.225.160', user='mincoding', password='1234', database='minDB', auth_plugin='mysql_native_password')
cur = db.cursor()
ready = None
timer = None
signal.signal(signal.SIGINT, closeDB)
polling()

#main thread
while True:
    sleep(0.1)
    if ready == None : continue

    cmd, arg = ready
    ready = None

    if cmd == "go" : go()
    if cmd == "back" : back()
    if cmd == "stop" : stop()
    if cmd == "left" : left()
    if cmd == "mid" : mid()
    if cmd == "right" : right()
profile
여러 방향으로 접근하는 개발자

0개의 댓글