영상분석 서버를 구현하기 전에 우선 화재감지 프로젝트 를 따라하면서 감을 잡아보기로 했다.
내 고물 노트북은 cpu가 노쇠하여 2.X 이상의 버전이 실행되지 않는다. aws에서 컴퓨터를 대여하기로 했다. 그런데 아마존은 커맨드라인 인터페이스라 이미지나 영상을 띄워서 실제로 잘 되는지 볼 수가 없다. 답답해서 Xming도 설치해보고, 우분투 데스크탑을 설치한 후 윈도우에서 원격 연결을 해봤지만 둘 다 심하게 버벅거리고 자꾸 튕겼다. 그냥 프레임을 출력해서 CCTV 영상을 실시간으로 받아오는지 확인만하고, 감지하면 저장해서 보기로 했다.
CCTV가 아직 없어서 남는 스마트폰을 웹캠으로 만든다음 영상을 실시간으로 받아오기로 했다. 그러나 http:// 형식으로 시작하는 스트리밍 영상은 우분투에서 opencv로 읽어와지지 않았다. 혹시나 싶어서 공공 데이터 포털에 공개된 rtsp:// 형식은 잘 받아와졌다. 밤이면 조명도 불로 인식할 수도 있으니, 교통 cctv 영상을 가져와서 화재감지 모델에 돌려보기로 했다.
화재감지 cctv 프로젝트가 3년전의 코드다보니 쓰이지 않는 함수들이 많았다. 에러가 날때마다 구글링으로 해결했다.
import cv2
import os
import Constants
import queue
import threading
import datetime
import sys
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import time
def init():
global img_file_path, frame_queue, fourcc
print("init")
frame_queue = queue.Queue() #프레임 큐
fourcc = cv2.VideoWriter_fourcc(*'XVID') #포맷
def set_video_writer(video_name, fourcc, FPS, width, height):
return cv2.VideoWriter(video_name, fourcc, FPS, (width, height))
#CCTV 영상 실시간으로 가져와서 프레임 큐에 저장
def connect_cctv():
global frame_queue,fource, cur_frame
url = 'rtsp://210.99.70.120:1935/live/cctv001.stream' #공공 데이터 센터에서 제공한 공공 CCTV
try:
cap = cv2.VideoCapture(url)
print("연결성공: ", url)
except:
print("연결 실패")
while True:
ret, frame = cap.read()
#전역 변수 프레임 큐에 프레임 담음
if frame_queue.qsize() < Constants.QUEUE_SIZE:
frame_queue.put(frame)
else:
frame_queue.get()
frame_queue.put(frame)
#실시간으로 화재 감지
def run_detector():
global frame_queue
print("run_detector")
from utils import label_map_util
from utils import visualization_utils as vis_util
sys.path.append("..")
MODEL_NAME = 'inference_graph'
CWD_PATH = os.getcwd() # 현재 경로
PATH_TO_CKPT = os.path.join(CWD_PATH,MODEL_NAME,'frozen_inference_graph.pb') # 모델 경로
PATH_TO_LABELS = os.path.join(CWD_PATH,'training','label_map.pbtxt') # pbtext 파일
NUM_CLASSES = 1
#라벨링 맵 로드
label_map = label_map_util.load_labelmap(PATH_TO_LABELS)
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES, use_display_name=True)
category_index = label_map_util.create_category_index(categories)
# 텐서플로우 모델 로드
detection_graph = tf.Graph()
with detection_graph.as_default():
od_graph_def = tf.compat.v1.GraphDef()
with tf.io.gfile.GFile(PATH_TO_CKPT, 'rb') as fid:
serialized_graph = fid.read()
od_graph_def.ParseFromString(serialized_graph)
tf.import_graph_def(od_graph_def, name='')
sess = tf.compat.v1.Session(graph=detection_graph)
#인풋
image_tensor = detection_graph.get_tensor_by_name('image_tensor:0')
# 아웃풋
detection_boxes = detection_graph.get_tensor_by_name('detection_boxes:0') # 박스
detection_scores = detection_graph.get_tensor_by_name('detection_scores:0') #점수
detection_classes = detection_graph.get_tensor_by_name('detection_classes:0')
num_detections = detection_graph.get_tensor_by_name('num_detections:0')
print(detection_scores, num_detections)
while True:
if frame_queue.empty() == True:
continue
image = frame_queue.get()
image_expanded = np.expand_dims(image, axis=0)
(boxes, scores, classes, num) = sess.run(
[detection_boxes, detection_scores, detection_classes, num_detections],
feed_dict={image_tensor: image_expanded})
# 박스, 라벨 표시
det_ret_list = vis_util.visualize_boxes_and_labels_on_image_array(
image,
np.squeeze(boxes),
np.squeeze(classes).astype(np.int32),
np.squeeze(scores),
category_index,
use_normalized_coordinates=True,
line_thickness=8,
min_score_thresh=0.80)
#불 감지
if 0 < len(det_ret_list):
for det_ret in det_ret_list:
cats_list = det_ret["cats"]
for cats_str in cats_list:
splt_cat = cats_str.split(":")
print("불 감지됨, 감지시간: ", time.strftime('%H시%M분%S초'))
cv2.imwrite(os.path.join('fire', time.strftime('%H시%M분%S초') + '.jpg'), image)
else:
print("감지 안됨")
continue
if __name__ == "__main__":
init()
t1 = threading.Thread(target=connect_cctv)
t1.start()
run_detector()
t1.join()