Python搭建RTMP流媒体服务器,实现网络摄像头的推流,并使用机器视觉模型Yolo-v3实时处理视频、输出分类标签。
# -*- coding: utf-8 -*-
"""
Created on Mon Jul 6 22:21:49 2020
@author: Straka
"""
# =============================================================================
# """
# 1 开启TomCat
# 2 开启NGINX(cmd)
# C:
# cd C:\Users\Administrator\Desktop\nginx
# start nginx
# nginx -s reload
# 3 开启 C:\ffmpeg-4.3-win64-static\bin\MulHYZY_dealOnly
# """
# =============================================================================
dealFps = 1/2
# import gc
import time,datetime
import queue
import threading
# import cv2 as cv
import subprocess as sp
import numpy as np
# import sys
# (2**31-1) # 设置栈的大小
# ("./")
import mxnet as mx
import os
os.environ["MXNET_CUDNN_AUTOTUNE_DEFAULT"] = "0"
os.chdir(r'C:/BabyCam')
import gluoncv as gcv
gcv.utils.check_version('0.6.0')
from gluoncv.model_zoo import get_model
print("包加载完毕")
from gluoncv.utils import try_import_cv2
cv = try_import_cv2()
nms_thresh = 0.45
classes = ['asleep','awake','crying','quilt kicked','side asleep','on stomach', 'face covered',]
classes = ['熟睡','清醒','哭闹','踢被','侧睡','趴睡', '捂被',]
dealCtx = [mx.gpu(0)]
# dealCtx = [(0)]
# 模型加载
model_dir = r'C:\BabyCam\model/yolov3/yolo3_darknet53_custom_best.params'
net_name = '_'.join(('yolo3', 'darknet53', 'custom'))
net = get_model(net_name, classes=classes, ctx=dealCtx)
net.load_parameters(model_dir, ctx=dealCtx)
net.set_nms(nms_thresh=0.45, nms_topk=400)
mx.nd.waitall()
net.hybridize()
print("模型A加载完毕")
netB = get_model(net_name, classes=classes, ctx=dealCtx)
netB.load_parameters(model_dir, ctx=dealCtx)
netB.set_nms(nms_thresh=0.45, nms_topk=400)
mx.nd.waitall()
netB.hybridize()
print("模型B加载完毕")
netC = get_model(net_name, classes=classes, ctx=dealCtx)
netC.load_parameters(model_dir, ctx=dealCtx)
netC.set_nms(nms_thresh=0.45, nms_topk=400)
mx.nd.waitall()
netC.hybridize()
print("模型C加载完毕")
netD = get_model(net_name, classes=classes, ctx=dealCtx)
netD.load_parameters(model_dir, ctx=dealCtx)
netD.set_nms(nms_thresh=0.45, nms_topk=400)
mx.nd.waitall()
netC.hybridize()
print("模型D加载完毕")
# =============================================================================
# netE = get_model(net_name, classes=classes, ctx=dealCtx)
# netE.load_parameters(model_dir, ctx=dealCtx)
# netE.set_nms(nms_thresh=0.45, nms_topk=400)
# ()
# ()
# print("模型E加载完毕")
# =============================================================================
tempframe = mx.nd.array(cv.cvtColor(np.ones((720, 1080, 3),np.uint8)*128, cv.COLOR_BGR2RGB)).astype('uint8')
rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(tempframe, short=416, max_size=1024)
rgb_nd = rgb_nd.as_in_context(dealCtx[0])
class_IDs, scores, bounding_boxes = net(rgb_nd)
class_IDs, scores, bounding_boxes = netB(rgb_nd)
class_IDs, scores, bounding_boxes = netC(rgb_nd)
class_IDs, scores, bounding_boxes = netD(rgb_nd)
# class_IDs, scores, bounding_boxes = netE(rgb_nd)
person = np.sum(class_IDs==0)
hat = np.sum(class_IDs==1)
scale = 1.0 * tempframe.shape[0] / scaled_frame.shape[0]
img, result = gcv.utils.viz.cv_plot_bbox(tempframe.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)
del img,tempframe,rgb_nd,class_IDs,scores,bounding_boxes,person,hat,scale,result
print("初始化完毕")
from tensorflow.keras.preprocessing.image import array_to_img
def showImg(frame):
array_to_img(frame).show()
# =============================================================================
# import datetime
# cap = (r"rtmp://0.0.0.0:1936/live/3")
# def read():
# now_time = ((),'%H:%M:%S')
# ret, frame = ()
# # showImg(frame)
# print(now_time)
# tempframe = ((frame, cv.COLOR_BGR2RGB)).astype('uint8')
# rgb_nd, scaled_frame = .transform_test(tempframe, short=416, max_size=1024)
# rgb_nd = rgb_nd.as_in_context(dealCtx[0])
# class_IDs, scores, bounding_boxes = net(rgb_nd)
# person = (class_IDs==0)
# hat = (class_IDs==1)
# scale = 1.0 * [0] / scaled_frame.shape[0]
# img, result = .cv_plot_bbox((), bounding_boxes[0], scores[0], class_IDs[0], class_names=, scale=scale, thresh=nms_thresh)
# showImg(img)
# del img,tempframe,frame
# print(result)
# read()
#
# =============================================================================
class Live(object):
def __init__(self):
self.fps=25
self.frame_queueA = queue.Queue()
self.frame_queueB = queue.Queue()
self.frame_queueC = queue.Queue()
self.frame_queueD = queue.Queue()
# self.frame_queueE = ()
self.maxqueue = 1
self.infoUrl=r"D:\"
self.camera_path = r"rtmp://0.0.0.0:1936/live/3"
self.count = np.zeros(7)
self.height=720
self.width=1280
self.dealTimes = 0
self.lastShow = int(time.time())
# 摄像头rtmp
def read_frame(self):
cap = cv.VideoCapture(self.camera_path)
while not cap.isOpened():
print("尝试重新连接")
cap = cv.VideoCapture(self.camera_path)
# Get video information
self.fps = int(cap.get(cv.CAP_PROP_FPS))
while self.fps==0:
print("尝试重新连接")
cap = cv.VideoCapture(self.camera_path)
self.fps = int(cap.get(cv.CAP_PROP_FPS))
# =dealFps # ===================================================
self.width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
self.height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
print("开启接收",self.camera_path)
print(self.width,self.height,self.fps)
cap.set(cv.CAP_PROP_BUFFERSIZE, 3); # internal buffer will now store only 3 frames
cap.set(cv.CAP_PROP_FOURCC, cv.VideoWriter_fourcc('M', 'J', 'P', 'G')) # 降低延迟
while cap.isOpened():
# startTime = ()
ret, frame = cap.read()
# ret, frame = ()
if ret==False:
print("尝试重新连接")
while ret==False:
cap = cv.VideoCapture(self.camera_path)
cap.set(cv.CAP_PROP_BUFFERSIZE, 3); # internal buffer will now store only 3 frames
cap.set(cv.CAP_PROP_FOURCC, cv.VideoWriter_fourcc('M', 'J', 'P', 'G')) # 降低延迟
ret,frame = cap.read()
print("重新连接成功")
# put frame into queue
# frame = (frame, (, ))
# print(())
while self.frame_queueA.qsize()>=self.maxqueue:
self.frame_queueA.get()
self.frame_queueA.put(frame)
# print('A',self.frame_queueA.qsize())
del frame
# tt = (1/+()) # 开始时间+每帧时间-当前时间-波动
# (tt if tt>0 else 0)
# startTime = ()
ret, frame = cap.read()
# ret, frame = ()
if ret==False:
print("尝试重新连接")
while ret==False:
cap = cv.VideoCapture(self.camera_path)
cap.set(cv.CAP_PROP_BUFFERSIZE, 3); # internal buffer will now store only 3 frames
cap.set(cv.CAP_PROP_FOURCC, cv.VideoWriter_fourcc('M', 'J', 'P', 'G')) # 降低延迟
ret,frame = cap.read()
print("重新连接成功")
# put frame into queue
# frame = (frame, (, ))
# print(())
while self.frame_queueB.qsize()>=self.maxqueue:
self.frame_queueB.get()
self.frame_queueB.put(frame)
# print('B',self.frame_queueB.qsize())
del frame
# tt = (1/+()) # 开始时间+每帧时间-当前时间-波动
# (tt if tt>0 else 0)
# startTime = ()
# ret, frame = ()
ret, frame = cap.read()
if ret==False:
print("尝试重新连接")
while ret==False:
cap = cv.VideoCapture(self.camera_path)
ret,frame = cap.read()
print("重新连接成功")
# put frame into queue
# frame = (frame, (, ))
# print(())
while self.frame_queueC.qsize()>=self.maxqueue:
self.frame_queueC.get()
self.frame_queueC.put(frame)
# print('C',self.frame_queueC.qsize())
del frame
# tt = (1/+()) # 开始时间+每帧时间-当前时间-波动
# (tt if tt>0 else 0)
# startTime = ()
# ret, frame = ()
ret, frame = cap.read()
if ret==False:
print("尝试重新连接")
while ret==False:
cap = cv.VideoCapture(self.camera_path)
ret,frame = cap.read()
print("重新连接成功")
# put frame into queue
# frame = (frame, (, ))
# print(())
while self.frame_queueD.qsize()>=self.maxqueue:
self.frame_queueD.get()
self.frame_queueD.put(frame)
# print('D',self.frame_queueD.qsize())
del frame
# tt = (1/+()) # 开始时间+每帧时间-当前时间-波动
# tt = (1/*4+())
# (tt if tt>0 else 0) # 这里要缓一下 服务器没这么好 暂时设置每四帧停一下
# =============================================================================
# if int(())->=5:
# print(((),'%H:%M:%S'))
# ret, frame = ()
# showImg(frame)
# = int(())
# =============================================================================
# =============================================================================
# # startTime = ()
# # ret, frame = ()
# ret, frame = ()
# if ret==False:
# print("尝试重新连接")
# while ret==False:
# cap = (self.camera_path)
# ret,frame = ()
# print("重新连接成功")
# # put frame into queue
# # frame = (frame, (, ))
# # print(())
# while self.frame_queueE.qsize()>=:
# self.frame_queueE.get()
# self.frame_queueE.put(frame)
# # print('D',self.frame_queueD.qsize())
# del frame
# # tt = (1/+()) # 开始时间+每帧时间-当前时间-波动
# # (tt if tt>0 else 0) # 这里要缓一下 服务器没这么好 暂时设置每四帧停一下
# =============================================================================
def dealA(self):
print("处理A线程开始")
while True:
if self.frame_queueA.empty() != True:
t1=time.time()
frame = self.frame_queueA.get()#取出队头
image = np.asarray(frame, dtype=np.uint8)
del frame
frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8')
del image
# 以上两句所用时间最多
rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024)
rgb_nd = rgb_nd.as_in_context(dealCtx[0])
class_IDs, scores, bounding_boxes = net(rgb_nd)
person = np.sum(class_IDs==0)
hat = np.sum(class_IDs==1)
scale = 1.0 * frame.shape[0] / scaled_frame.shape[0]
img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)
del img
self.dealTimes += 1
for x in result:
if classes.count(x)>0:
i = classes.index(x)
self.count[i]+=1
# print('--A', result,()-t1)
def dealB(self):
print("处理B线程开始")
while True:
if self.frame_queueB.empty() != True:
t2=time.time()
frame = self.frame_queueB.get()
image = np.asarray(frame, dtype=np.uint8)
frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8')
# 以上两句所用时间最多
rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024)
rgb_nd = rgb_nd.as_in_context(dealCtx[0])
class_IDs, scores, bounding_boxes = netB(rgb_nd)
person = np.sum(class_IDs==0)
hat = np.sum(class_IDs==1)
scale = 1.0 * frame.shape[0] / scaled_frame.shape[0]
img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)
for x in result:
if classes.count(x)>0:
i = classes.index(x)
self.count[i]+=1
self.dealTimes += 1
# print('---B', result,()-t2)
def dealC(self):
print("处理C线程开始")
while True:
if self.frame_queueC.empty() != True:
t3=time.time()
frame = self.frame_queueC.get()#取出队头
image = np.asarray(frame, dtype=np.uint8)
frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8')
# 以上两句所用时间最多
rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024)
rgb_nd = rgb_nd.as_in_context(dealCtx[0])
class_IDs, scores, bounding_boxes = netC(rgb_nd)
person = np.sum(class_IDs==0)
hat = np.sum(class_IDs==1)
scale = 1.0 * frame.shape[0] / scaled_frame.shape[0]
img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)
for x in result:
if classes.count(x)>0:
i = classes.index(x)
self.count[i]+=1
self.dealTimes += 1
# print('----C', result,()-t3)
def dealD(self):
print("处理D线程开始")
while True:
if self.frame_queueD.empty() != True:
t4=time.time()
frame = self.frame_queueD.get()#取出队头
image = np.asarray(frame, dtype=np.uint8)
frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8')
# 以上两句所用时间最多
rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024)
rgb_nd = rgb_nd.as_in_context(dealCtx[0])
class_IDs, scores, bounding_boxes = netD(rgb_nd)
person = np.sum(class_IDs==0)
hat = np.sum(class_IDs==1)
scale = 1.0 * frame.shape[0] / scaled_frame.shape[0]
img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)
for x in result:
if classes.count(x)>0:
i = classes.index(x)
self.count[i]+=1
self.dealTimes += 1
# print('-----D', result,()-t4)
# =============================================================================
# def dealE(self):
# print("处理E线程开始")
# while True:
# if self.frame_queueE.empty() != True:
# t4=()
# frame = self.frame_queueE.get()#取出队头
# image = (frame, dtype=np.uint8)
# frame = ((image, cv.COLOR_BGR2RGB)).astype('uint8')
# # 以上两句所用时间最多
# rgb_nd, scaled_frame = .transform_test(frame, short=416, max_size=1024)
# rgb_nd = rgb_nd.as_in_context(dealCtx[0])
# class_IDs, scores, bounding_boxes = netE(rgb_nd)
# person = (class_IDs==0)
# hat = (class_IDs==1)
# scale = 1.0 * [0] / scaled_frame.shape[0]
# img, result = .cv_plot_bbox((), bounding_boxes[0], scores[0], class_IDs[0], class_names=, scale=scale, thresh=nms_thresh)
# for x in result:
# if (x)>0:
# i = (x)
# [i]+=1
# += 1
# # print('-----D', result,()-t4)
# =============================================================================
def send(self):
time.sleep(8)
print("信息发送线程开始")
while True:
result=[classes[np.argmax(self.count)]] if not np.max(self.count)<=2 else []
# 如果三秒内某一标签最大出现次数大于两次,则计入出现次数最多的标签,作为结果输出,否则为空标签
print(self.count,result,self.dealTimes)
with open(self.infoUrl,'w',encoding='utf8') as f:
f.write(str(result))
self.count = np.zeros(7)
self.dealTimes = 0
time.sleep(1/dealFps)
def run(self):
# 多线程处理
threads = [
threading.Thread(target=Live.read_frame, args=(self,)),
threading.Thread(target=Live.dealA, args=(self,)),
threading.Thread(target=Live.dealB, args=(self,)),
threading.Thread(target=Live.dealC, args=(self,)),
threading.Thread(target=Live.dealD, args=(self,)),
# (target=, args=(self,)),
threading.Thread(target=Live.send, args=(self,)),
]
[thread.setDaemon(True) for thread in threads]
[thread.start() for thread in threads]
# (r'C:\ffmpeg-4.3-win64-static\bin') # ffmpeg所在地址
L = Live()
L.run()
def checkTime():
print(datetime.datetime.strftime(datetime.datetime.now(),'%H:%M:%S'))
showImg(cv.cvtColor(L.frame_queueA.get(), cv.COLOR_BGR2RGB))