python源码:opencv多视频源同屏拼接播放

时间:2024-07-05 11:09:54

一、前言

        如标题所示,这个python代码的目的是利用opencv模块实现多视频源同屏拼接播放的,里面包含视频播放尺寸修改、视频播放加序号、视频流存活检测等方案,可做扩展开发使用。

二、代码

import cv2
import time
from func_timeout import func_set_timeout
import func_timeout
import multiprocessing
import numpy as np

# 注意! 因受屏幕分辨率影响,此参数下,桌面1920*1080此处最多适宜播放9个视频!
# 每个视频框的宽度和高度
box_x = 640
box_y = 360

# 最大列视频数
row_max = 3

# 这里留视频流链接或者mp4都可以
# urls = ['rtsp://admin:XXXXX@172.16.1.1:554/Streaming/Channels/101', 'rtmp://172.16.1.2:1935/live1']
urls = [r'E:\AAAA\2024-07-01\0730.mp4', r'E:\AAAA\2024-07-01\0740.mp4',
        r'E:\AAAA\2024-07-01\0750.mp4']


# 视频绝对空间排列
# 规则设定:单行不超过3个
def cell_boxes(cell_num):
    if cell_num <= row_max:
        rows = cell_num
        cols = 1
    else:
        remainder = cell_num % row_max
        quotient = cell_num // row_max
        if remainder:
            cols = quotient + 1
        else:
            cols = quotient
        rows = 3

    print('rows', rows)
    print('cols', cols)
    final_matrix = np.zeros((box_y * cols + 30 * cols, box_x * rows + 30 * rows, 3), np.uint8)

    # 划分每个视频的空间
    boxs_site = []
    for n in range(cap_num):
        n = n + 1
        line_col = n % rows
        if line_col == 0:
            line_row = n // rows
            line_col = rows
        else:
            line_row = n // rows + 1
        start_x = (line_col - 1) * box_x
        start_y = (line_row - 1) * box_y
        end_x = line_col * box_x
        end_y = line_row * box_y
        boxs_site.append([n, [start_y, start_x], [end_y, end_x]])

    return final_matrix, boxs_site


# 视频存活检查
@func_set_timeout(30)  # 设定函数超执行时间
def check(base_url):
    cap = cv2.VideoCapture(base_url)
    cap.read()
    return cap


# 视频播放
def camera_run(q, camera_id, base_url):
    while True:
        try:
            cap = check(base_url)
            print('ok!')
            break
        except func_timeout.exceptions.FunctionTimedOut:
            print(base_url, '未打开')
            time.sleep(5)
            # exit()
    try:
        fps = cap.get(cv2.CAP_PROP_FPS)
        while True:
            ret, frame = cap.read()
            frame = cv2.resize(frame, (box_x, box_y))
            cv2.putText(
                img=frame,
                text=camera_id,  # 文本
                org=(25, 25),  # 字体开始写的左上角位置
                fontFace=cv2.FONT_HERSHEY_SIMPLEX,  # 字体`
                fontScale=1,  # 字体大小
                color=(255, 0, 0),  # 颜色
                thickness=2  # 字体粗细
            )
            cv2.waitKey(int(float(1 / int(fps)) * 1000))
            q.put([str(int(camera_id) + 1), frame])
    except Exception as e:
        print(e)
        print(f'摄像头 {camera_id} 在读流成功后关闭\n')


if __name__ == '__main__':
    cut_box = []
    msg_box = []

    cap_num = len(urls)

    # 最小行列密集设计
    final_matrix, boxs_site = cell_boxes(cap_num)
    print(boxs_site)

    # 进程通讯队列
    q = multiprocessing.Queue()
    for i in range(cap_num):
        camera_id = str(i)

        # 每路视频设一个进程读流
        a = multiprocessing.Process(target=camera_run, args=(q, camera_id, urls[int(camera_id)]))
        a.start()

    # 主进程视频统一播放
    while True:
        # 获取子进程传回的视频流信息
        msg = q.get()

        camera_id = msg[0]

        # 根据摄像头id获取摄像头展示框的绝对坐标
        site_start = boxs_site[int(camera_id) - 1][1]
        site_end = boxs_site[int(camera_id) - 1][2]

        # 放入主进程公共视频播放窗口
        final_matrix[site_start[0] + 10: site_end[0] + 10, site_start[1] + 10:site_end[1] + 10] = msg[1]
        cv2.imshow('video', final_matrix)

        cv2.waitKey(1)

三、效果