多线程实现多路rtsp视频opencv处理

时间:2025-02-27 07:38:43

多路视频需要并行分析,其中需要使用一些深度学习模型进行目标检测和跟踪等操作.

一开始采用多进程,但是实验下来发现每个进程会分配一个GPU模型,无法共享全局模型/变量. 系统压力巨大,无法分配这么多GPU显存.

于是把模型封装成http服务api,让多进程调用服务进行视频分析,结果发现路数一多会崩.

最终还是考虑了多线程:

多线程可以共享全局变量/模型!!!!

类用于接收串流影像,降低延迟

2.利用的ThreadPool,可以控制进程一样控制线程

3.利用map函数实现多线程并行处理图像,并放入缓存队列

4.从缓存队列中获取图像结果并显示出来

# -*- coding: utf-8 -*-
import cv2
import time
import threading
from  import Pool as ThreadPool
import numpy as np
from collections import deque
from time import sleep
 
# 接收摄影机串流影像,采用多线程的方式,降低缓冲区栈图帧的问题。
class ipcamCapture:
    def __init__(self, URL):
         = []
         = URL
         = False
         = False
		
	# 摄影机连接。
         = (URL)
 
    def start(self):
	# 把程序放进子线程,daemon=True 表示该线程会随着主线程关闭而关闭。
        print('ipcam started!')
        (target=, daemon=True, args=()).start()
 
    def stop(self):
	# 记得要设计停止无限循环的开关。
         = True
        print('ipcam stopped!')
   
    def getframe(self):
	# 当有需要影像时,再回传最新的影像。
        return 
        
    def queryframe(self):
        while (not ):
            ,  = ()            
        ()
 
def image_infer(source_id):
    i= source_id
    try:
        frame = ipcams[i].getframe() 
        #****************do someting with the frame************#
        # Fortunately,global variables can be used here!       #
        #eg.                                                   #
        # detection#                                   # 
        #                                        #
        #                                            #
        #4...                                                  #
        ########################################################
        recent_Frames[i].append(frame) #最终结果放入缓存队列
    except Exception as e:
        Nosignal_wall_paper = ((400,720,3))
        (Nosignal_wall_paper,'NOSIGNALS',(100,250),cv2.FONT_HERSHEY_SIMPLEX,3,(0,255,0),15)
        recent_Frames[i].append(Nosignal_wall_paper)
        print(e.__traceback__.tb_lineno,)               
 
def multi_infer(source_ids):
    pool = ThreadPool(8)
    while True:
        start_time = ()
        (image_infer,source_ids)
        print('用时共: %s ms' % ((() - start_time)*1000))
        # 多线程无法在内部show图像,所以把图像结果缓存到队列中,最后一起显示
        for i,Frame in enumerate(recent_Frames):
            ('source{}'.format(i),(Frame[0],(720,400))) #队列第一帧
            (1)
 
if __name__ == "__main__":
    URLS = [
    "rtsp://**********",
    "rtsp://**********",
    "rtsp://**********",
    "rtsp://**********"] #rtsp地址列表
 
    ipcams =[] #摄像机对象列表
    recent_Frames= [deque(maxlen=10) for _ in range(len(URLS))]#存放结果图像的队列
    source_ids = list(range(len(URLS))) #rtsp源编号
    for URL in URLS:
        # 连接摄影机
        ipc = ipcamCapture(URL)
        (ipc)
    for  ipcam in  ipcams:
        # 启动子线程
        ()
    # 暂停1秒,确保影像已经填充
    (1)
    # 使用无穷循环撷取影像,直到按下Esc键结束
    multi_infer(source_ids)