视频在线直播系统:www.hixiu.com;在线聊天系统demo:www.liaofuwu.com |
核心系统框架
视频直播核心系统架构主要包括Web端架构、聊天系统架构、视频直播、用户状态同步架构等。
Web端框架
由Nginx组成的前端负载集群,后端由IIS、FPM服务器进行解析。前端将由Nginx集群处理已静态化页面及向后端提交未静态或不做静态化要求的请求,后端Cached为应用缓存,主要减少对数据库无意义请求造成的压力,数据库架构由一主一备组成(目前暂无备库)。
聊天系统框架
聊天系统分为聊天室(ChatRoom)、消息同步中心(CastServer)、用户列表(UserList Server)、系统广播(SystemCast Server)及监控报表(Report)组成。同房间不同服务器之间用户的消息系统广播、及用户列表则通过消息同步中心进行传递。用户连接房间之前由调度中心分配聊天室服务器进行连接。该架构特点是同时在线人数易扩展、可实现负载均衡。
用户状态同步机制
用户状态(Session)的同步机制之所以列为核心,是考虑到用户消费、充值等行为与非用户行为造成用户属性变更能及时反馈给用户,此举能大大提高产品的用户体验。该机制主要实现不同框架、不同服务器、不同站点(域名)之间用户状态的同步。同步之主要工作由SessionComponents组件完成。
事件 |
触发 |
用户端反应 |
是否重发 |
用户充值 |
系统 |
用户成功充值后由系统发起系统广播,通知在线用户充值成功,并无需刷新帐号金额自动同步。 |
否 |
运营发放库存及运营币 |
系统 |
无论用户呆在哪个房间,都能收到该系统消息(目前如果用户不在线,不能成功接收) |
否 |
… |
… |
… |
… |
服务器规划
用途 |
参考配置 |
数量 |
说明 |
聊天 |
CPU:4核,内存8G |
3-6 |
含:聊天室、用户列表、系统广播(注:服务器要求双线机房) |
Web |
CPU:4核,内存:8G |
2-4台 |
站点、充值、接口、资源、活动等服务器 |
前端 |
CPU:4核,内存:8G |
1-2 |
Nginx服务器 |
分步式缓存 |
CPU:4核,内存:8G |
2 |
Memcached服务器(应用缓存与用户Session状态) |
数据库 |
CPU:8核,内存:16G |
2 |
数据库服务器,一主一备 |
流媒体 |
CPU:8核,内存:16G |
1-N |
Rtmp服务器 |
视频系统
视频系统主要包含客户端推流、服务端处理、客户端接收流三大部份。视频流畅效果取决于推流端与接收端两端带宽,任何一方网络带宽问题都会将降低观看者效果,其中推流端由为重要,其将影响所有人观看。
视频插件
高清插件由Flash调用的ActiveX控件,只支持以IE为内核的浏览器调用,适用于主播端。高清插件主要功能有两个,1、加水印;2、进行高清编码。
插件代码片断
void RtmpLiveScreen::Run()
{
//连接rtmp server,完成握手等协议
librtmp_->Open(rtmp_url_.c_str());
//发送metadata包¹
SendMetadataPacket();
//开始捕获音视频
ds_capture_->StartAudio();
ds_capture_->StartVideo();
while (true)
{
if(SimpleThread::IsStop()) break;
// 从队列中取出音频或视频数据
std::deque<RtmpDataBuffer>rtmp_datas;
{
base::AutoLock alock(queue_lock_);
if(false == process_buf_queue_.empty())
{
rtmp_datas = process_buf_queue_;
process_buf_queue_.clear();
}
}
// 加上时戳发送给rtmp server
for (std::deque<RtmpDataBuffer>::iterator it =rtmp_datas.begin();
it!= rtmp_datas.end(); ++it)
{
RtmpDataBuffer& rtmp_data = *it;
if (rtmp_data.data != NULL)
{
if (rtmp_data.type ==FLV_TAG_TYPE_AUDIO)
SendAudioDataPacket(rtmp_data.data, rtmp_data.timestamp);
else
SendVideoDataPacket(rtmp_data.data, rtmp_data.timestamp,rtmp_data.is_keyframe);
}
}
Sleep(1);
}
ds_capture_->StopVideo();
ds_capture_->StopAudio();
librtmp_->Close();
}
流媒体服务
视频流完全可以不经过流媒体服务,由于各家CDN所覆盖区域偏重点不一致,故系统采用流媒体服务实现一对多的推流是为了更好的适应不同区域客户端的网络情况。如下图所示:
CDN
接口说明 |
接口地址 |
说明 |
推流地址 |
http://up.v.XXX.cn/live/ROOMID |
live为发布点名称,与CDN商量协定,ROOMID为房间ID |
拉流地址 |
http://down.v.XXX.cn/live/ROOMID |
同上 |
|
|
|
聊天系统
聊天服务由底层采用Socket中的完成端口方式实现,该系统单台服务器支持10000以上的连接数,虽然所有Socket连接方式的客户端都被接受,但在服务端有身份验证机制防止恶意连接。目前实现对接的客户端有Flash、C++。服务端与客户端通讯由固定协议(包头+包体)完成,包体大都采用JSON进行序列化及反序列化。
通讯协议
每条完整的消息由包头和包体组成(当然包体可以为空,也就是消息仅含包头的情况),包头共20字节,由5组int类型数据组成,分别代表指令ID、房间ID、用户ID、时间及描述包体长度。
Header/Body |
Header |
Body |
|||||||||||||||||||
字节 |
1 |
2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
10 |
11 |
12 |
13 |
14 |
15 |
16 |
17 |
18 |
19 |
20 |
JSON |
说明 |
COMMAND |
ROOMID |
USERID |
TIME |
BODYLENGTH |
||||||||||||||||
数值 |
10001 |
10092 |
4697995 |
1706 |
256 |
||||||||||||||||
类型 |
Int |
Int |
Int |
Int |
Int |
String |
指令说明
协议 |
指令 |
指令JSON格式(示例) |
指令描述 |
10001 |
COMMAND_SYSTEM_SYN_MESSAGE |
|
同步系统消息 |
10002 |
COMMAND_SYSTEM_SYN_USERMESSAGE |
|
同步用户消息 |
10003 |
COMMAND_SYSTEM_MESSAGE |
|
系统公告 |
10004 |
COMMAND_SYSTEM_SYN_USEREXITSTATUS |
|
用户退出消息 |
10005 |
COMMAND_SYSTEM_SYN_USERJOINSTATUS |
|
用户进入房间消息 |
10006 |
COMMAND_SYSTEM_USERINFOCHANGE |
|
用户余额变更 |
10007 |
COMMAND_SYSTEM_SYN_ROOMINFOREPORTMESSAGE |
|
同步统计房间用户信息 |
10008 |
COMMAND_SYSTEM_CASTROOMMESSAGE |
|
广播 |
10009 |
COMMAND_SYSTEM_WORLDCASTROOMMESSAGE |
|
世界广播 |
10010 |
COMMAND_SYSTEM_CAST_SUNNY |
|
阳光普照 |
20001 |
COMMAND_USER_GIFTS |
|
用户送礼 |
20002 |
COMMAND_USER_MESSAGE |
|
用户聊天消息 |
20003 |
COMMAND_USER_MANAGER_MESSAGE |
|
用户管理消息 |
20004 |
COMMAND_USER_OPERATE_EXITMESSAGE |
|
用户主动退出消息 |
20005 |
COMMAND_USER_ANCHOR_LIVEIN |
|
主播正在直播消息 |
20006 |
COMMAND_CLIENT_LOGININFO |
|
用户登录消息 |
20007 |
COMMAND_CLIENT_USERLIST |
|
用户列表通知消息 |
20008 |
COMMAND_CLIENT_LOGINOTHERUSER |
|
用户登录后通知其它用户的消息 |
20009 |
COMMAND_CLIENT_LOGINOUTOTHERUSER |
|
用户退出后通知其它用户的消息 |
20010 |
COMMAND_CLIENT_USERROLELIST |
|
用户权限列表 |
20011 |
COMMAND_CLIENT_APPENDUSERLIST |
|
追加用户列表 |
20012 |
COMMAND_CLIENT_SENDMESSAGESOFAST |
|
警告发送过快消息 |
20013 |
COMMAND_CLIENT_REQUESTSENDUSERLIST |
|
请求加载用户列表消息 |
30001 |
COMMAND_RETURN_CONNECTED |
|
客户端连接成功消息 |
30002 |
COMMAND_RETURN_CANNOTSPEAK |
|
用户被禁言消息 |
30005 |
COMMAND_RETURN_OPERATE_CANNOTSPEAK_SUCCESS |
|
禁言用户成功消息 |
30006 |
COMMAND_RETURN_OPERATE_CANNOTSPEAK_FAILURE |
|
禁言用户失败消息 |
30007 |
COMMAND_RETURN_OPERATE_KICKUSER_SUCCESS |
|
踢出用户成功消息 |
30008 |
COMMAND_RETURN_OPERATE_KICKUSER_FAILURE |
|
踢出用户失败消息 |
30009 |
COMMAND_RETURN_OPERATE_BANIPUSER_SUCCESS |
|
禁IP成功消息 |
30010 |
COMMAND_RETURN_OPERATE_BANIPUSER_FAILURE |
|
禁IP失败消息 |
30011 |
COMMAND_RETURN_OPERATE_RELIEVE_SUCCESS |
|
取消禁言成功消息 |
30012 |
COMMAND_RETURN_OPERATE_RELIEVE_FAILURE |
|
取消禁言失败消息 |
30013 |
COMMAND_RETURN_OPERATE_SETADMIN_SUCCESS |
|
设置用户为管理员成功消息 |
30014 |
COMMAND_RETURN_OPERATE_SETADMIN_FAILURE |
|
设置用户为管理员失败消息 |
30015 |
COMMAND_RETURN_OPERATE_RMADMIN_SUCCESS |
|
删除用户管理员成功消息 |
30016 |
COMMAND_RETURN_OPERATE_RMADMIN_FAILURE |
|
删除用户管理员失败消息 |
30017 |
COMMAND_RETURN_USERVERIFY_FAILURE_KICK |
|
用户验证失败(被踢出了) |
31001 |
COMMAND_RETURN_USERVERIFY_FAILURE |
|
用户验证失败 |
32001 |
COMMAND_RETURN_USER_NOTWORTHBALANCE |
|
用户余额不足 |
32002 |
COMMAND_RETURN_DEDUCTIONS_FAILURE |
|
扣费验证失败 |
40001 |
COMMAND_SCENE_MANAGER_BEGIN |
|
直播开场 |
40002 |
COMMAND_SCENE_MANAGER_END |
|
直播关场 |
40003 |
COMMAND_SCENE_MANAGER_RETURN |
|
操作失败消息 |
40004 |
COMMAND_SCENE_MANAGER_SEATUSERLIT |
|
座位列表消息 |
40005 |
COMMAND_SCENE_MANAGER_ENCORE |
|
安可的开启与关闭 |
40006 |
COMMAND_SCENE_MANAGER_ENCOREINFO |
|
安可信息的发送 |
50001 |
COMMAND_CLIENT_SENDTOCLIENTSTATUS |
|
客户端向服务器报告连接状态 |
60001 |
COMMAND_FACETIME_REQUEST |
|
联麦申请 |
60002 |
COMMAND_FACETIME_UPDATE |
|
联麦信息更新 |
60003 |
COMMAND_FACETIME_USER_COMPLETE |
|
联麦用户 |
60004 |
COMMAND_FACETIME_USERLIST |
|
联麦用户列表 |
60005 |
COMMAND_FACETIME_ADDFACETIMES |
|
增加联麦时间 |
60006 |
COMMAND_FACETIME_REQUESTOUTER |
|
申请退出联麦 |
60007 |
COMMAND_FACETIME_REQUESTCASTUSERLIST |
|
申请广播联麦用户列表 |
2677001 |
SERVER_CHATROOM_USERJOIN |
|
用户进入(同步至用户列表服务器) |
2677002 |
SERVER_CHATROOM_USEROUTER |
|
用户退出(同步至用户列表服务器) |
2677003 |
SERVER_CHATROOM_CASTUSERLIST |
|
广播用户列表(用户列表服务器) |
2677004 |
SERVER_CHATROOM_USERSTAT |
|
发送房间统计数据 |
2677005 |
SERVER_CHATROOM_USERLIST_RESET |
|
清除用户列表服务器上用户(当聊天服务器重启时) |
2677006 |
SERVER_CHATROOM_USERINFO_UPDATE |
|
房间用户信息变更(同步更新至用户列表服务器) |
2677101 |
SERVER_CHATROOM_SYNC_SERVERSTATUS |
|
子服务状态同步 |
2677102 |
SERVER_CHATROOM_SYNC_CASTMESSAGE |
|
广播消息同步 |
2677103 |
SERVER_CHATROOM_SYNC_CONVEYMESSAGE |
|
广播消息中转 |
88482677 |
COMMAND_SYSTEM_OPERATE_GETROOMINFO |
|
获取房间日志 |
Flash的特殊处理
Flash通过Socket连接聊天服务端会出现安全策略问题,通常页面上加载Flash时会默认请求当前域下的crossdomain.xml和这里的情况一样。所以,在服务端监听时对请求做特殊的处理,将策略文件内容通过连接发送回去。一般策略文件请求在连接端口的Socket中收到,但在这之前Flash默认的会先对843端口提交请求,只有失败后或超时(约3秒)才会向连接端口重新请求,所以在843端口另启监听线程专门来应对策略文件的请求会比一般的处理速度更快。
代码片断
消息处理分线程
bool _iocpSocket_OnRecv(RecvMessage recv)
{
try
{
Message message = recv.Message;
if (null == message) return false;
ChatRoom chatRoom =_chatRooms.GetChatRoom(message.Head.RoomId, true);
DealWithMessageThread dealWith = null;
switch ((Command)message.Head.Command)
{
case Command.COMMAND_CLIENT_LOGININFO:
chatRoom.messageDealWithNew(recv.SO,message);
Log.WriteErrorLog("SocketServer::_iocpSocket_OnRecv",message.ToString());
break;
case Command.COMMAND_USER_GIFTS:
dealWith= _dealWithGiftsThreadPool.Get();
dealWith.Add(newMessageParams(0,recv.SO, recv.Message, chatRoom,null));
break;
default:
dealWith= _dealWithMessageThreadPool.Get();
dealWith.Add(newMessageParams(0,recv.SO, recv.Message, chatRoom,null));
break;
}
return true;
}
catch (Exceptionex)
{
Log.WriteErrorLog("SocketServer::_iocpSocket_OnRecv",ex.Message);
}
returnfalse;
}
各服务器间的消息同步
privatevoid recvMessageDealWith(Messagemessage,EndPoint endPoint)
{
try
{
_tmpMessage.Add(String.Format("[{0}]来自{1}的消息{2}",DateTime.Now,endPoint,JsonConvert.SerializeObject(message.Head)));
if (_tmpMessage.Count > 50)
{
_tmpMessage.RemoveAt(0);
}
string ip = Convert.ToString(endPoint).Split(':')[0];
//string port = Convert.ToString(endPoint).Split(':')[1];
switch ((Command)message.Head.Command)
{
///服务器状态同步
case Command.SERVER_CHATROOM_SYNC_SERVERSTATUS:
ServerInfo info = ByteConverter.BytesToObject(message.Body)asServerInfo;
ChildServer _s = loadServer(ip, info);
_s.Reset(info);
if (_s.IsNode && null== _nodeServer)
{
_nodeServer= _s;
}
break;
default:
sysCast(message,ip);
break;
}
}
catch (Exceptionex)
{
Log.WriteErrorLog("ServerControl::recvMessageDealWith","{0}",Encoding.UTF8.GetString(message.Body));
}
}
策略文件监听
_crossDomainServer = new CrossDomainServer();
bool ret =_crossDomainServer.StartListen(_helper.GetCrossDomainPolicyPort(),_helper.GetCrossDomainPolicyData());
if (ret)
{
Log.WriteSystemLog("SocketServer::Listen","服务器[{0}]成功开启安全策略请求的监听器,端口为[{1}]", _helper.GetLocalServerId(), _helper.GetCrossDomainPolicyPort());
}
else
{
Log.WriteSystemLog("SocketServer::Listen","警告警告服务器 [{0}]开启安全策略请求的监听器,端口为 [{1}]失败”,_helper.GetLocalServerId(), _helper.GetCrossDomainPolicyPort());
}
策略文件处理
if (null!= client)
{
SendMessage sm = newSendMessage(client, _crossDomainXmlBytes);
client.BeginSend(sm.SendBuffer,sm.SendIndex, sm.SendCount
,SocketFlags.Partial,newAsyncCallback(clientSendCallback),sm);
}
Flash端连接处理
var tryCount:int = 0;//重连次数
var reconndelay:int = 3000;//重连间隔
var socket = new Socket();
socket.addEventListener(Event.CONNECT,connectOk);
socket.addEventListener(Event.CLOSE,closed);
socket.addEventListener(IOErrorEvent.IO_ERROR,ioErrored);
socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR,securityErrored);
connectToServer();
private function connectToServer():void
{
woo.debug.debug.trace("socket","正在连接到"+HOST+":"+PORT);
setTimeout(function():void{socket.connect(HOST,PORT);},reconndelay*tryCount);
tryCount ++;
}
private function securityErrored(eve:SecurityErrorEvent):void
{
//安全策略错误处理
}
private function ioErrored (eve:SecurityErrorEvent):void
{
//IOERROR处理
}
private function closed (eve:SecurityErrorEvent):void
{
connectToServer();//重连
}
Flash端消息发送
socket.addEventListener(ProgressEvent.SOCKET_DATA,receiveData);
private function receiveData(eve:ProgressEvent =null):void
{
if(this.DATA.CMD==0)
{
if(socket.bytesAvailable>=20)
{
varhead:ByteArray = new ByteArray();
socket.readBytes(head,0,20);
varbyte0:ByteArray = new ByteArray();
varbyte1:ByteArray = new ByteArray();
varbyte2:ByteArray = new ByteArray();
varbyte3:ByteArray = new ByteArray();
varbyte4:ByteArray = new ByteArray();
byte0.position= 0;
byte1.position= 0;
byte2.position= 0;
byte3.position= 0;
byte4.position= 0;
head.readBytes(byte0,0,4);
head.readBytes(byte1,0,4);
head.readBytes(byte2,0,4);
head.readBytes(byte3,0,4);
head.readBytes(byte4,0,4);
DATA.CMD= byteToInt(byte0);
DATA.ROOMID= byteToInt(byte1);
DATA.USERID= byteToInt(byte2);
DATA.TIME= byteToInt(byte3);
DATA.BODYLENGTH= byteToInt(byte4);
}
else
{return;}
}
if(socket.bytesAvailable>=this.DATA.BODYLENGTH)
{
varbody:String = socket.readUTFBytes(DATA.BODYLENGTH);
DATA.BODY =body;
this.sendNotification(Notifications.M_SOCKET_RECEIVE_MSG,this.DATA);
data = null;
data = newSocketMsgVo();//完整消息取完,清空数据对象
if(20<=socket.bytesAvailable)
{
arguments.callee();//再执行当前方法
}
else
{return;}
}
}
Flash消息的发送
public function sendMsg(vo:SocketMsgVo):void
{
if(socket.connected)
{
socket.writeBytes(vo.bytes,0,vo.bytes.length);
socket.flush();
}
else
{
socket.connect(HOST,PORT);
}
}
充值
充值采用第三方充值平台实现,可接入支付宝、易宝等第三方开发平台,原理简单,不做详细介绍。
虚拟礼物
虚拟礼物的实现主要分为两块,用户提交送礼请求由扣费服务验证,成功后广播到房间所有用户在前端展示礼物效果。
送礼详细流程
扣费代码片断
礼物展示代码片断
//播放礼物
publicfunction ShowGiftInter(id:int, num:int,type:int,w:int,h:int, msg:String ="") {
this.setStageSize(w,h);
_gift.showGiftInter(id, num, type,msg);
trace("123123");
}
//设置播放区域
publicfunction setStageSize(wid:int, het:int) {
_gift.resetGroup(wid - w,het - h);
w = wid;
h = het;
_fs.setStageSize(wid, het);
_gift.setStageSize(wid, het);
}
//播放飞屏
publicfunction showFlyText(str:String,w:int,h:int,speed:int = 4) {
setStageSize(w,h);
_fs.setStaticText(str, speed);
}
//播放进场动画
publicfunction ShowCarInter(id:int,w:int,h:int,_no:int,_nick:String,_sex:int)
{
this.setStageSize(w,h);
_gift.showGiftInter(id,1,5,{liangNo:_no,nickName:_nick,sex:_sex});
}