利用python3.5 构建流媒体后台音视频切换的服务端程序

时间:2022-01-02 00:15:22
#!/usr/bin/env python3.5.0
# -*- coding:utf8 -*-
import os,sys,socket,hashlib,time,select,threading,configparser
import pymssql
rootdir =os.path.abspath(sys.argv[0])
rootdir =os.path.dirname(rootdir) +"/"
cf =configparser.ConfigParser()
if os.path.exists(rootdir +'srs_server.conf'):
cf.read(rootdir +'srs_server.conf')
else:
# 如果文件不存在
f = open(rootdir + "/srs_server.conf","w")
f.write("")
f.close()
# 读取配置文件
cf.read(rootdir +'srs_server.conf')
cf.add_section("srs")
cf.set("srs","config","")
# 写回配置文件
cf.write(open(rootdir +'srs_server.conf',"w"))
def decrypt(s,key=2):
"""
解密方法
:param key:
:param s:
:return:
"""
c = bytearray(str(s).encode("gbk"))
n = len(c) # 计算 b 的字节数
if n % 2 != 0 :
return ""
n = n // 2
b = bytearray(n)
j = 0
for i in range(0, n):
c1 = c[j]
c2 = c[j+1]
j = j+2
c1 = c1 - 65
c2 = c2 - 65
b2 = c2*16 + c1
b1 = b2^ key
b[i]= b1
try:
return b.decode("gbk")
except:
return "failed"
# 实例化数据库类
class MSSQL:
def __init__(self,host,user,pwd,db,port =1433):
self.host = host
self.port = port
self.user = user
self.pwd = pwd
self.db = db
def __GetConnect(self):
try:
if not self.db:
print("没有设置数据库信息")
self.conn = pymssql.connect(host=self.host,port=self.port,user=self.user,password=self.pwd,database=self.db,charset="utf8")
cur = self.conn.cursor()
if not cur:
print("连接数据库失败!")
else:
return cur
except Exception as e:
print("连接数据库失败,%s"%e)
def ExecQuery(self,sql):
cur = self.__GetConnect()
cur.execute(sql)
resList = cur.fetchall()
# 查询完毕后必须关闭连接
self.conn.close()
return resList def ExecNonQuery(self,sql):
cur = self.__GetConnect()
cur.execute(sql)
self.conn.commit()
self.conn.close()
# 查询数据库相关信息情况
# 更新SQL数据库相关信息情况
def updatesql(load=0,status=0,item=0):
# 加载配置文件
cf = configparser.ConfigParser()
if os.path.exists(rootdir+"srs_server.conf"):
try:
cf.read(rootdir+"srs_server.conf")
except Exception as c:
print(c)
else:
print("加载srs_server.conf配置文件失败!")
# 实例化MSSQL操作类
host = cf.get("HOST","host")
ms = MSSQL(host=cf.get("DB","ip"),user=decrypt(cf.get("DB","username")),pwd=decrypt(cf.get("DB","password")),db=cf.get("DB","db"),port=int(cf.get("DB","port")))
try:
# 更新该服务器所有记录
sql = "update sys_load set item = '%s',sysload='%s',status='%s' where servername ='%s'"%(item, load, status, host)
ms.ExecNonQuery(sql)
except:
pass def selectdata(conn):
conf = cf.get("srs","config")
if conf == "":
conn.send(bytes("当前为音频模式!","utf-8"))
elif conf == "":
conn.send(bytes("当前为视频模式!","utf-8"))
def updatesound(conn):
with open(rootdir +'srs.conf',"r") as f:
data = f.readlines()
for i,line in enumerate(data):
# 去除制表符,换行符
line = line.replace('\t','').replace('\n','').replace(' ','')
if line =="#allowpublish220.168.91.254;":
data[i] ="\tallow\t\tpublish\t\t220.168.91.254;\t\n"
elif line == "#forward10.27.68.150;":
data[i] = "\tforward\t\t10.27.68.150;\t\t \n"
elif line == "transcodelive/livestream{":
f.seek(i+1)
data[i+1] = "\tenabled\t\ton;\n"
with open(rootdir +'srs.conf',"w") as xf:
xf.writelines(data)
xf.close()
status = os.system("service srs restart")
if status == 0:
cf.read(rootdir +'srs_server.conf')
cf.set("srs","config","")
cf.write(open(rootdir +'srs_server.conf',"w"))
updatesql(item=0,load=1,status=0)
conn.send(bytes("当前已成功切换为音频模式","utf-8"))
else:
conn.send(bytes("切换为音频模式失败,请联系IT部!","utf-8"))
def updatevideo(conn):
with open(rootdir +'srs.conf',"r") as f:
data = f.readlines()
for i,line in enumerate(data):
# 去除制表符,换行符
line = line.replace('\t','').replace('\n','').replace(' ','')
if line =="allowpublish220.168.91.254;":
data[i] ="\t#allow\t\tpublish\t\t220.168.91.254;\t\n"
elif line == "forward10.27.68.150;":
data[i] = "\t#forward\t10.27.68.150;\n"
elif line == "transcodelive/livestream{":
# 下一行
f.seek(i+1)
data[i+1] = "\tenabled\t\toff;\n"
with open(rootdir +'srs.conf',"w") as xf:
xf.writelines(data)
xf.close()
status = os.system("service srs restart")
if status == 0:
cf.read(rootdir +'srs_server.conf')
cf.set("srs","config","")
cf.write(open(rootdir +'srs_server.conf',"w"))
updatesql(item=1,load=3,status=1)
conn.send(bytes("当前已成功切换为视频模式","utf-8"))
else:
conn.send(bytes("切换视频模式失败,请联系IT部!","utf-8"))
def crash_recovery(conn):
os.system("service srs stop")
# 强制还原配置文件
os.system("yes|cp -fr srs_bak.conf srs.conf")
status = os.system("service srs start")
if status == 0:
cf.read(rootdir +'srs_server.conf')
cf.set("srs","config","")
cf.write(open(rootdir +'srs_server.conf',"w"))
updatesql(item=0,load=1,status=0)
conn.send(bytes("修复音频模式成功","utf-8"))
else:
conn.send(bytes("修复音频模式失败,请与IT部联系!","utf-8"))
# 业务执行函数
def operation(conn):
content = """
请输入数字选项进行操作:
1、查询当前直播频道
2、切换频道为音频直播
3、切换频道为视频直播
4、故障修复(默认修复为音频模式)
***********************
"""
conn.send(bytes(content,"utf-8"))
while True:
# 接收数据
data = conn.recv(1000)
test =data.decode()
if int(test) == 1:
selectdata(conn)
elif int(test) ==2:
updatesound(conn)
elif int(test) ==3:
updatevideo(conn)
elif int(test) ==4:
crash_recovery(conn)
# SOCKET主进程模块
def process(conn,addr):
try:
i = 0
# 认证失败允许重试3次
while i < 3:
flage = False
# 接收客户端连接请求信息
info = conn.recv(1000)
# 实例化加密函数
hash = hashlib.sha512()
hash.update("a123456789B".encode("utf-8")) # KEY=a123456789B
hash_pwd = hash.hexdigest()
if info.decode() == hash_pwd:
hash_pwd = ""
info = ""
if addr[0] in ["192.168.1.252","192.168.1.190","127.0.0.1"]:
flage = True
# 接收用户及密码信息
while flage:
operation(conn)
else:
# 登陆失败,发送给客户端重新验证
i += 1
conn.send(bytes("error","utf8"))
if i > 2:
# 主动关闭连接
conn.close()
time.sleep(25)
except Exception as e:
conn.close()
time.sleep(15)
# SOCKET服务模块
def sock_server():
'''
启动服务器端,开启线程监听
:return:
'''
server = socket.socket()
server_ip ="localhost"
server_port = 1888
server.bind((server_ip,server_port))
server.listen(10)
while True:
r,w,e = select.select([server,], [], [], 1)
for i,server in enumerate(r):
time.sleep(1)
conn,addr = server.accept()
# 创建线程
t = threading.Thread(target=process, args=(conn, addr))
# 启动线程
t.start()
if __name__ =="__main__":
sock_server()