在ESFramework 4.0 进阶(02)-- 核心:消息处理的骨架流程一文中我们详细介绍了ESFramework中消息处理的骨架流程,并且我们已经知道,ESFramework中的所有通信引擎使用的都是这一套骨架流程。ESFramework内置了多种通信引擎以完全支持“客户端/服务端、TCP/UDP、文本协议/二进制协议”这些特性的组合。本文就来剖析ESFramework中的各种通信引擎。
一.通信引擎接口继承关系图
INetEngine是所有网络引擎的基础接口,接下来再派生出服务端引擎接口和客户端引擎接口,然后分别派生出服务端TCP引擎接口和客户端TCP引擎接口;另外,UDP引擎接口分别与服务端引擎接口和客户端引擎接口结合形成服务端UDP引擎接口和客户端UDP引擎接口。
看到这个关系图,大家可能会有两个疑问:
(1)该图体现了服务端引擎/客户端引擎、TCP引擎/UDP引擎,但是,对文本协议/二进制协议的支持没有体现出来?
(2)这个关系图非常的对称,但是有一点,图中既然有IUdpEngine接口,为什么没有ITcpEngine接口了?
对于第一个问题,是这样的,ESFramework对文本协议/二进制协议的支持是在具体的引擎类的实现的时候来定义的。正如下图所示:
甚至,有些引擎类对文本协议/二进制协议的支持仅仅需要通过设置ContractFormatStyle属性就可以指定。
对于第二个问题,可以这样理解:对于UDP来说,由于它是基于非连接的,服务端引擎和客户端引擎使用完全相同的Socket模式,所以可以提取出相同性质的接口;而对于TCP,其是基于连接的协议,服务端引擎模型与客户端引擎模型的本质差异就太大了,以至于没必要去刻意地提取一个ITcpEngine接口。
二.通信引擎基础接口INetEngine
所有的通信引擎的根都是INetEngine接口,我们先来看看它的定义:
public interface INetEngine : IDisposable
{
/// <summary>
/// 传输层协议类型,TCP或UDP
/// </summary>
ProtocolType ProtocolType { get; } /// <summary>
/// 引擎实例的创建时间。
/// </summary>
DateTime CreateTime { get; } /// <summary>
/// ESFramework 规定的标准的日志记录器。
/// </summary>
IAgileLogger EsfLogger { get; set; } /// <summary>
/// Socket(网卡)发送缓冲区的大小。默认为8k。
/// </summary>
int SocketSendBuffSize { get; set; } /// <summary>
/// Socket(网卡)接收缓冲区的大小。默认为8k。
/// </summary>
int SocketReceiveBuffSize { get; set; } /// <summary>
/// 网络引擎能够接收的最大的消息尺寸。据此网络引擎可以为每个Session/Connection开辟适当大小的接收缓冲区。
/// 默认为1k。当接收到的消息尺寸超过MaxMessageSize时,将会关闭对应的连接(对TCP)或丢弃数据(对UDP)。
/// </summary>
int MaxMessageSize { get; set; } /// <summary>
/// 是否异步处理消息,默认值false(即在接收消息的线程中处理消息)。
/// </summary>
bool HandleMessageAsynchronismly { get; set; } /// <summary>
/// 初始化网络引擎。如果修改了引擎配置参数,在应用新参数之前必须先重新调用该方法初始化引擎。
/// </summary>
void Initialize(); /// <summary>
/// 等价于调用Initialize后,再调用Start。
/// </summary>
void InitializeAndStart(); /// <summary>
/// 启动引擎
/// </summary>
void Start(); /// <summary>
/// 停止引擎。调用Start方法可以再次启动引擎。
/// 对于TCP服务端引擎,表示不再接收新的连接,并关闭现有所有连接。
/// 对于UDP或客户端引擎,表示不再接收后续的消息。
/// </summary>
void Stop(); /// <summary>
/// 消息分派器。网络引擎将调用它来分派接收到的消息。
/// </summary>
IMessageDispatcher MessageDispatcher { get; set; } /// <summary>
/// 接收到一个完整的消息时触发该事件。
/// </summary>
event CbGeneric<IUserAddress, IMessage> MessageReceived; /// <summary>
/// 将消息成功发送之后触发该事件
/// </summary>
event CbGeneric<IUserAddress, IMessage> MessageSent; /// <summary>
/// 当接收到不完整或无法解析的数据时触发该事件
/// </summary>
event CbGeneric<IUserAddress, MessageInvalidType> InvalidMsgReceived;
}
注释已经做了很多说明,这里还有几点要补充一下:
(1)INetEngine继承自IDisposable接口,这表示通信引擎内部使用了一些宝贵的需要及时释放的资源,像Socket句柄、缓冲区、内存、线程等等。所以,当不再需要使用某个通信引擎的实例时,请及时调用它的Dispose方法来释放这些资源。
(2)SocketSendBuffSize属性和SocketReceiveBuffSize属性设置的是网卡的缓冲区大小,这两个属性的设定将被直接提交给OS底层。而我们应用程序中用来接收和发送数据的内存缓冲区是另外的部分,不要与这两个属性混淆了。
(3)MaxMessageSize属性的值决定了我们的应用程序需要为每个连接或Session开辟多大的接收缓冲区,通常,我们的接收缓冲区的尺寸的设置至少要比MaxMessageSize大,以保证缓冲区能接收一条完整的消息。当然也有特例,这个以后再讲。
(4)HandleMessageAsynchronismly属性的值决定了是否在接收线程中同步处理消息。如果为false,表示同步处理消息,在这种情况下,需要特别注意,在前一条消息的处理中,不能使用后续的消息(因为后续的消息要等本条消息处理完后才会接收),否则会导致无限阻塞。
(5)Initialize方法和Start方法:Initialize方法用于绑定端口或建立连接,Start方法用于启动监听线程或接收线程。
(6)对于Stop方法,要特别注意一点:如果是UDP引擎或客户端引擎,且消息是同步处理的(HandleMessageAsynchronismly为false),那么请不要在消息处理器中同步调用该方法(异步调用Stop则可以),否则,会导致死锁。因为Stop方法使用了Mutex,要等到最后一条消息处理完毕其调用才会返回。
(7)MessageDispatcher属性的注入就使得通信引擎与我们在ESFramework 4.0 进阶(02)-- 核心:消息处理的骨架流程一文中介绍的骨架流程结合起来了。
(8)通过MessageReceived事件和MessageSent事件,我们可以监控到通信引擎接收和发送的每一条消息。用户管理器IUserManager正是通过这些事件来跟踪每个在线用户的相关状态的。
(9)当通信引擎接收到无效的消息时,会触发InvalidMsgReceived事件,该事件的第一个参数是无效消息的发送者源地址,第二个参数为无效消息的类型,分为如下几种:
public enum MessageInvalidType
{
/// <summary>
/// 正常消息。
/// </summary>
Valid = 0,
/// <summary>
/// 消息尺寸溢出。
/// </summary>
MessageSizeOverflow,
/// <summary>
/// 无效的消息头
/// </summary>
InvalidHeader,
/// <summary>
/// 无效的标识符
/// </summary>
InvalidToken
}
三.服务端引擎基础接口IServerEngine
服务端引擎接口定义如下:
public interface IServerEngine : INetEngine
{
/// <summary>
/// 要监听的本机IP地址。如果是多网卡机器,希望同时监听多块网卡,则设置为null或""。
/// </summary>
string IPAddress { get; set; } /// <summary>
/// 要监听的本机端口。
/// </summary>
int Port { get;set;} /// <summary>
/// 每个服务器的唯一标识。
/// </summary>
int ServerID { get; set; } /// <summary>
/// 主动给某个客户同步发信息,不经任何处理,直接发送。注意:如果引擎已经停止,则直接返回。
/// </summary>
void SendMessageToClient(IUserAddress adderss, IMessage msg); /// <summary>
/// 主动给某个客户异步发信息,不经任何处理,直接发送。注意:如果引擎已经停止,则直接返回。
/// </summary>
void PostMessageToClient(IUserAddress adderss, IMessage msg);
}
注释也很详细,但是还有必要唠叨几句:
(1)IPAddress属性可以不设置,但是如果设置,其值一定要是当前机器所配置的IP之一。如果IPAddress属性的值不是当前机器的IP,则在调用引擎的Initialize方法时将抛出异常。
(2)服务端引擎必须通过某个Port为客户端提供服务, 引擎会在Initialize方法中绑定这个指定的端口。
(3)ServerID用于在ESFramework Platform体系中标志Cluster中的每一台服务器。在一般简单的通信应用中,可以忽略它。
(4)我们可以通过服务端引擎的SendMessageToClient方法和PostMessageToClient方法主动给客户端发消息。要特别注意的是,结合ESFramework 4.0 进阶(02)-- 核心:消息处理的骨架流程一文中的流程图,我们可以发现,如果直接在引擎这一层向网络发送消息,则这个消息是没有经过MessagePipe的。对于那些要经过MessagePipe加密之后才能发送的消息,需要在调用服务端引擎的SendMessageToClient和PostMessageToClient方法之前手动调用加密算法,这个会比较麻烦。幸运的是,ESFramework提供了正规发送器ESFramework.Server.IRegularSender,通过IRegularSender来主动发送消息,它会保证消息在提交给引擎发送到网络之前,经过MessagePipe进行正确的变换。
(5)一般情况下,如果给单个用户发送消息,使用SendMessageToClient方法就可以;如果是群发给多个用户(通过foreach遍历),则最好使用PostMessageToClient,这样可以避免延迟累积。
四.客户端引擎基础接口IPassiveEngine
客户端引擎接口定义如下:
public interface IPassiveEngine :INetEngine
{
/// <summary>
/// 服务器地址
/// </summary>
AgileIPEndPoint ServerIPEndPoint { get; set; }
/// <summary>
/// 采用哪个端口与服务器进行通信,如果取值小于等于0,则表示由系统自动分配。
/// </summary>
int Port { get;set;} /// <summary>
/// 发送消息的通道是否正忙。
/// </summary>
bool ChannelIsBusy { get; } /// <summary>
/// 将消息发送给服务器,不经任何处理,直接发送。注意:如果引擎已经停止,则直接返回。
/// </summary>
/// <param name="msg">要发送的消息</param>
/// <param name="dataPriority">消息的优先级</param>
void SendMessageToServer(IMessage msg, DataPriority dataPriority);
}
(1)ServerIPEndPoint属性告知客户端引擎需要与哪个服务器(包括IP和Port)进行通信。AgileIPEndPoint类型仅仅是对IPEndPoint的简单封装,目的在于我们可以通过类似Spring的IOC容器在配置文件中对IPE进行配置。
(2)客户端可以通过Port属性来指定使用哪个端口与服务器进行通信。但是如果指定的端口已经被使用,客户端引擎会自动切换使用该端口附近的某个端口,真正绑定的端口可以通过Port属性的getter获取。
(3)由于客户端引擎采用了发送队列,所以可以检测到队列中是否还有未发送的消息,如果有,则ChannelIsBusy属性的值为true。我们也许可以根据该属性的值来决定是否要丢弃后续某些不重要的待发送的消息。
(4)SendMessageToServer方法用于客户端主动给服务器发送消息。同服务端引擎的SendMessageToClient方法和PostMessageToClient方法一样,被发送的消息是没有经过MessagePipe的。但是,ESFramework同样也提供了用于客户端的正规发送器ESFramework.Passive.IRegularSender。
(5)客户端通信引擎为发送的消息提供了优先级,它确保那些优先级高的消息将被先发送。优先级分为四种:
public enum DataPriority
{
High = 0 ,//紧急命令
Common, //如普通消息
Low , //如传递的文件
CanBeDiscarded //如聊天的视频数据、音频数据
}
当存放CanBeDiscarded类型的消息的发送队列满时,后续进来的CanBeDiscarded类型的消息将会挤掉发送队列中最先进入的消息。
五.TCP服务端引擎接口ITcpServerEngine
TCP服务端引擎接口定义如下:
public interface ITcpServerEngine : IServerEngine
{
/// <summary>
/// 某个连接连上后,如果在ExpiredSpanInSecs时间内不发送任何数据,则将关闭该连接。
/// 如果ExpiredSpanInSecs小于等于0,则不做过期检查。默认值为0。
/// </summary>
int ExpiredSpanInSecs { get; set; } /// <summary>
/// 当前在线连接的数量。
/// </summary>
int ConnectionCount { get; }
/// <summary>
/// 服务器允许最大的同时连接数。
/// </summary>
int MaxConnectionCount { get;} /// <summary>
/// 给每个连接发送数据的超时时间(默认为-1,无限)。
/// </summary>
int WriteTimeoutInMSecs { get; set; } /// <summary>
/// 监听器是否开启。
/// </summary>
bool IsListening { get; } /// <summary>
/// 主动关闭连接,触发SomeOneDisconnect事件。
/// </summary>
void CloseOneConnection(IUserAddress adderss);
/// <summary>
/// 关闭或开启监听器。该方法调用不影响网络引擎的消息接收和处理。
/// </summary>
void ChangeListenerState(bool enabled);
/// <summary>
/// 获取所有在线连接的客户端的地址。
/// </summary>
IList<IUserAddress> GetAddressList(); /// <summary>
/// 当某连接断开或者从该连接上接收非法/无效的消息时,触发该事件
/// </summary>
event CbGeneric<IUserAddress> SomeOneDisconnect; /// <summary>
/// 当tcp连接建立成功时,触发此事件。
/// </summary>
event CbGeneric<IUserAddress> SomeOneConnected; /// <summary>
/// 当tcp连接数量发生变化时,触发此事件。
/// </summary>
event CbGeneric<int> ConnectionCountChanged; /// <summary>
/// 当连接监听器的状态发生变化时,触发此事件。事件参数为true,表明连接监听器启动;事件参数为false,表明连接监听器已停止。
/// </summary>
event CbGeneric<bool> ListenerStateChanged;
}
(1)ExpiredSpanInSecs属性的作用是用于断开那些仅仅连接上来却在一段时间内不发送任何数据的客户端,以释放TCP连接的资源。
(2)MaxConnectionCount属性可以控制服务端最多允许多少人同时在线。这个属性值可以根据服务器的硬件配置、项目的需求以及业务逻辑的复杂程度进行恰当设置。
(3)当网络状况很糟糕时,给客户端发送一条消息可能需要数秒甚至数十秒的时间,WriteTimeoutInMSecs属性就用于控制发送所需时间的最大值。如果在WriteTimeoutInMSecs时间内未将数据发送完,则将记录异常到日志,并关闭对应的TCP连接。该属性只对同步模式发送数据有效,异步发送数据(Post)则无法设定超时时间。最好给该属性赋一个有限值,因为在某些情况下,发送数据可能会导致相当长时间的阻塞。
(4)只有当IsListening为true时,表示正在监听,引擎才会接收新到来的tcp连接请求。通常在停止服务器时,我们都希望先是不再接收新的连接,但让旧的连接上的通信仍然正常、后台的业务处理也不会被干扰,这样可以逐渐地释放当前在线的用户,而不是粗暴地断开所有连接。要达到这样的效果,只要调用ChangeListenerState方法将IsListening设为false即可。当IsListening属性的值发生变化时,还会触发引擎的ListenerStateChanged事件。
(5)CloseOneConnection方法使得服务端可以主动断开指定的连接,有时也许是为了及时释放资源,有时也许是为了剔除恶意的用户。
(6)我们可以通过引擎的SomeOneConnected事件和SomeOneDisconnect事件来感知新连接的建立和旧连接的断开。用户管理器IUserManager正是通过预定这两个事件来准确判定每个用户的在线状态的。
六.TCP客户端引擎接口ITcpPassiveEngine
TCP客户端引擎接口定义如下:
public interface ITcpPassiveEngine :IPassiveEngine
{
/// <summary>
/// 当客户端与服务器的TCP连接断开时,将触发此事件。
/// </summary>
event CbGeneric ConnectionInterrupted; /// <summary>
/// 自动重连开始时,触发此事件。
/// </summary>
event CbGeneric ConnectionRebuildStart; /// <summary>
/// 自动重连成功后,触发此事件。
/// </summary>
event CbGeneric ConnectionRebuildSucceed; /// <summary>
/// 自动重连超过最大重试次数时,表明重连失败,将触发此事件。
/// </summary>
event CbGeneric ConnectionRebuildFailure; /// <summary>
/// 当前是否处于连接状态。
/// </summary>
bool Connected { get; } /// <summary>
/// 当与服务器断开连接时,是否自动重连。
/// </summary>
bool AutoReconnect { get; set; } /// <summary>
/// 当连接断开时,自动重连尝试的最大次数。默认值为int.MaxValue。
/// </summary>
int MaxRetryCount4AutoReconnect { get; set; } /// <summary>
/// 可丢弃的消息发的送队列的大小。默认值1。
/// </summary>
int QueueSizeOfDiscarded { get; set; } /// <summary>
/// 不可丢弃的消息的发送队列的大小。默认值1。
/// </summary>
int QueueSizeOfNonDiscarded { get; set; } /// <summary>
/// 依据ContractFormatStyle属性,注入相应的ContractHelper。
/// </summary>
IContractHelper ContractHelper { get; set; } /// <summary>
/// 手动重连。如果当前处于连接状态,则直接返回。
/// </summary>
/// <param name="retryCount">重试次数</param>
/// <param name="retrySpanInMSecs">重试间隔时间,毫秒</param>
void Reconnect(int retryCount, int retrySpanInMSecs);
}
(1)TCP客户端引擎支持掉线后自动重连的功能,并会根据连接状态的改变触发相应的“连接断开”、“重连开始”、“重连成功”、“重连失败(重连尝试的次数超过MaxRetryCount4AutoReconnect属性设定的值时触发)”等事件。
(2)通过将AutoReconnect属性设为true以开启自动重连机制。如果AutoReconnect为false,可以在连接断开后,手动调用Reconnect方法进行重连尝试。
(3)在前面介绍消息优先级时,我们讲到针对每个优先级类型,都会有一个发送队列与之对应,发送队列的大小即是由QueueSizeOfNonDiscarded属性和QueueSizeOfDiscarded属性的值来控制的。通常名称便知,QueueSizeOfNonDiscarded属性用于控制优先级类别为DataPriority.High、DataPriority.Common、DataPriority.Low的发送队列的尺寸,而QueueSizeOfDiscarded用于控制DataPriority.CanBeDiscarded的发送队列的尺寸。
(4)在ESFramework 4.0 进阶(01) -- 消息一文中讲到,ESFramework中消息的解析是通过IContractHelper来完成的,根据TCP客户端引擎所支持的消息类型,可以分别注入IStreamContractHelper实例(二进制协议)或ITextContractHelper实例(文本协议)。
七.UDP引擎基础接口IUdpEngine
UDP引擎基础接口定义如下:
public interface IUdpEngine
{
/// <summary>
/// 所采用的消息协议的格式:二进制协议或文本协议。
/// </summary>
ContractFormatStyle ContractFormatStyle { get; set; } /// <summary>
/// 内部是否使用增强型的EnhancedUdpClinet。
/// </summary>
bool UseEnhancedUdp { get; set; } /// <summary>
/// 引擎内部采用的EnhancedUdpClient实例。
/// set方法:如果UseEnhancedUdp为true而又不设置该属性,则引擎会实例化一个默认的EnhancedUdpClient。
/// get方法:如果UseEnhancedUdp为false,则返回null。暴露该属性使得应用可以监控增强型的EnhancedUdpClient的运行状态。
/// </summary>
IEnhancedUdpClient EnhancedUdpClient { get; set; } /// <summary>
/// 如果UseEnhancedUdp为true,则由该属性决定EnhancedUdpClinet使用何种发送策略来发送DataPriority.CanBeDiscarded优先级的消息。
/// 决定DataPriority.CanBeDiscarded到DatagramSendingType的映射。
/// </summary>
IDatagramSendingTypeDecider DatagramSendingTypeDecider { set; } /// <summary>
/// 可以一次发送的UDP数据报的最大尺寸,如果超过这个尺寸,则会将消息拆分。
/// </summary>
int MaxUdpDatagramLength { get; set; }
}
(1)最特殊的,ESFramework中的UDP引擎支持UDP增强。所谓UDP增强,就是ESFramework内置了一套应用层的机制,以保证在通过UDP进行通信时,不会丢失数据、也不会出现不完整错误的数据。就像是使用TCP一样,但是区别在于,TCP是在协议层来保证数据的正确、完整性的,而ESFramework的UDP增强则是在应用层做的,所以,UDP增强的效率要比TCP低很多。因此,如果需要保证数据的一致完整性,能使用TCP的地方就使用TCP;如果TCP通道不可用,才可考虑是否使用UDP增强。
(2)UDP增强实现比较复杂,而使用者也没必须要了解它的实现,如果确实需要用到UDP增强,只要将UseEnhancedUdp属性配置为true就可以自动开启了。 还有一点要注意,要使UDP增强能正确运行,通信的双方必须都使用增强的UDP,如果一方使用增强的UDP,而另一方使用普通UDP,则通信会发生错误。
(3)UDP增强不仅仅适用于服务端与客户端之间的通信,也适用于客户端之间经过P2P通道的通信。
(4)当启用了UDP增强,则可以通过设置DatagramSendingTypeDecider属性来决定每个消息的发送策略。 有三种策略可以选择:
/// <summary>
/// 当通过EnhancedUdpClient来发送数据报时,采取的发送策略。
/// </summary>
public enum DatagramSendingType
{
/// <summary>
/// 可靠正序的发送
/// </summary>
Reliable = 0, /// <summary>
/// 使用可靠的通道发送,但是当通道处于Block状态时,则丢弃要发送的数据报。(保证实时性)
/// </summary>
DiscardOnBlock, /// <summary>
/// 使用允许丢弃数据报的通道发送
/// </summary>
SendByAllowDiscardedChannel
}
最简单的,我们可以将前面提到的消息发送的优先级与DatagramSendingType做一个基本的映射即可。如果需要更复杂的控制,可以自己实现IDatagramSendingTypeDecider接口。
八.UDP服务端引擎接口IUdpServerEngine
前面已经介绍了UDP引擎接口和服务端引擎接口,而IUdpServerEngine接口只要从它们两个继承就OK了:
public interface IUdpServerEngine :IServerEngine ,IUdpEngine
{
}
九.UDP客户端引擎接口IUdpPassiveEngine
UDP客户端引擎接口也一样简单:
public interface IUdpPassiveEngine : IPassiveEngine, IUdpEngine
{
/// <summary>
/// 向指定的端点发送UDP消息。注意:如果引擎已经停止,则直接返回。
/// </summary>
/// <param name="message">要发送的消息</param>
/// <param name="address">目标端点</param>
/// <param name="dataPriority">消息优先级。</param>
void SendMessage(IMessage message, IPEndPoint address, DataPriority dataPriority);
}
新增加的SendMessage方法背后的含义是:在使用UDP客户端引擎时,我们不仅可以向服务端发送消息,而且还可以向其它的任何端点发送消息。基于UDP的P2P通道就需要通过这个方法来向其它客户端发送消息的。
本文已经介绍了ESFramework中提供的所有的通信引擎接口,关于通信引擎的实现和如何使用,我们将在下篇文章中介绍,毕竟,本文的内容已经够多了。
未完,待续......