一个.net客户端通讯框架的设计(四)---协议解码/编码器工厂以及为其他模块提供消息的dealers/trigers

时间:2022-02-13 14:29:56
IProtocolFactory
编码的工作不应该涉及到业务,也就是说编写业务代码不应该掺杂将对象序列化为byte[]的代码。使用框架的开发人员应该是能够使用.Write(【任意对象类型】obj)而不是只能.Write(【基本数据类型】data)。
所以编码的部分放在IEncoder中解决,为了支持不同类型对象的编码,我定义了一个泛型工厂接口IProtocolFactory。
 
 
 
public   interface IProtocolFactory
    {
        IEncoder
< T > GetEncoder < T > ();
        IDecoder GetDecoder();
    }

 

下面是发送数据用的Send方法:
 
 
 
一个.net客户端通讯框架的设计(四)---协议解码/编码器工厂以及为其他模块提供消息的dealers/trigers一个.net客户端通讯框架的设计(四)---协议解码/编码器工厂以及为其他模块提供消息的dealers/trigerspublic void Send (T msg)
public   void Send < T > (T msg)
        {
            var args
= _asyncSendArgsPool.Resolve();
           
byte [] ret = ProtocolFactory.GetEncoder < T > ().Encode(msg);
            args.SetBuffer(ret,
0 , ret.Length);
            _socket.SendAsync(args);
        }

 

解码器IDecoder的实现

在前几篇篇文章中,我们的通讯模块已经实现了socket连接、消息的发送与消息的解析,不过消息的解析部分是接口调用,并没有给出确切的协议解析方式。协议的解析式完全可以定制的。我们只需实现IDecoder接口即可。

 
 
 
public   interface IDecoder
    {
        DecodeResult Decodable(
byte [] data);
       
void Decode( byte [] data, out   int usedSize, out   object msg);
    }

 

在前言中,我给出了目前我所使用的协议格式,基于这个协议格式,我们实现IDecoder如下:

 
 
 
一个.net客户端通讯框架的设计(四)---协议解码/编码器工厂以及为其他模块提供消息的dealers/trigers一个.net客户端通讯框架的设计(四)---协议解码/编码器工厂以及为其他模块提供消息的dealers/trigersDefaultDecoder
namespace Sopaco.Labs.ClientProtocolLib.Protocol
{
   
public   class DefaultDecoder : IDecoder
    {
       
#region IDecoder Members

       
public DecodeResult Decodable( byte [] data)
        {
            var buffer
= FastBuffer.Wrap(data);
            buffer.Clear();
           
if (buffer.Remaining <   4 )
               
return DecodeResult.NEED_DATA;
           
int length = buffer.ReadInt32();
           
if (buffer.Remaining < length -   4 )
               
return DecodeResult.NEED_DATA;
           
/* if (buffer.Remaining > length - 4)
                return DecodeResult.NOT_OK;
*/
           
return DecodeResult.OK;
        }

       
public   void Decode( byte [] data, out   int usedSize, out   object outMsg)
        {
            var buffer
= FastBuffer.Wrap(data);
            var netMsg
=   new NetworkMessageBase();
           
int length = buffer.ReadInt32();
            netMsg.MsgId
= buffer.ReadInt16();
            netMsg.ContentData
= buffer.ReadBytes(length - ShareVars.MESSAGE_HEADER_LENGTH);
            outMsg
= netMsg;
            usedSize
= length;
        }

       
#endregion
    }
}

 

看过上一篇那个AsyncSocketConnector的人会知道IDecoder.Decodable方法会在连接器受到消息时被调用,我们回顾一下
当AsyncSocketConnector接收到数据的时候调用的_socketArgs_Received:
 
 
 
一个.net客户端通讯框架的设计(四)---协议解码/编码器工厂以及为其他模块提供消息的dealers/trigers一个.net客户端通讯框架的设计(四)---协议解码/编码器工厂以及为其他模块提供消息的dealers/trigersprivate void _socketArgs_Received(object sender, SocketAsyncEventArgs args)
private   void _socketArgs_Received( object sender, SocketAsyncEventArgs args)
        {
            var socket
= sender as Socket;
           
if (args.SocketError == SocketError.Success)
            {
               
// check if the buffer is full used(ByteTransfered, OffSet)
                var bufferStatus = args.UserToken as BufferStatus;
                bufferStatus.Buffer.Append(args.Buffer, args.Offset
/* bufferStatus.ReadOffset */ , args.BytesTransferred);
                bufferStatus.IncreaseTransfered(args.BytesTransferred);
                var decoder
= ProtocolFactory.GetDecoder();
                var data
= bufferStatus.Buffer.copyAvaliableBytes();
                var result
= decoder.Decodable(data);
               
switch (result)
                {
                   
case DecodeResult.NEED_DATA:
                        socket.ReceiveAsync(args);
                       
break ;
                   
case DecodeResult.NOT_OK:
                       
throw   new BadImageFormatException();
                   
case DecodeResult.OK:
                       
int usedSize;
                       
object netMsg =   null ;
                        decoder.Decode(data,
out usedSize, out netMsg);
                        ThreadPool.QueueUserWorkItem(Handler.MessageReceived, netMsg);
                        bufferStatus.Buffer.Reset();
                        bufferStatus.Buffer.Append(data, usedSize, data.Length
- usedSize);
                        socket.ReceiveAsync(args);
                       
break ;
                }
            }
        }

 

传过来的data数据是本次接收到数据与上一次解析过程中剩余的数据的拼接。我们先使用自己的FastBuffer包装一下byte[]以方便从中读取数据。
之后检查消息是否>4(消息长度为4个字节),只有读出了这4个字节的数据才能知道这条消息到底有多长。之后计算“buffer.Remaining < length - 4”
如果剩余数据没有读完,仍返回NEED_DATA,如果剩余消息长度>所需消息长度,ok,我们能从中获得一条完整的消息。返回OK即可。
之后框架会判断返回结果:
一个.net客户端通讯框架的设计(四)---协议解码/编码器工厂以及为其他模块提供消息的dealers/trigers 

如果是NEED_DATA,它会再次接受消息。如果是OK,调用Decoder.Decode方法,这里有个参数需要注意:”out usedSize”,这是IDecoder中必须设置的一个值,它会告诉连接器你到底读了多少个数据,剩余的数据会被存储以在下一次接受到数据时进行拼接。

为其他模块提供消息处理接口

消息能够解析了,也能够通过”ThreadPool.QueueUserWorkItem(Handler.MessageReceived, netMsg)”这样的方式传递个IoHandler来处理。但是毕竟我们不可能将所有的业务都写在IoHandler的MessageReceived中。好的,我们再设计IDealer和IDealersFactory工厂:
public interface IDealer
    {
        void deal(byte[] netMsg);
    }
interface IDealersFactory
    {
        void Deal(Sopaco.Labs.ClientConnLib.Message.NetworkMessageBase netMsg);
        void Register(short msgId, IDealer dealer);
        void UnRegister(short msgId, IDealer dealer);
    }
好了,现在外部模块只需调用IDealersFactory.Register即可注册消息的处理。
修改后的MessageReceived方法
public void MessageReceived(object msg)
        {
            var netMsg = msg as NetworkMessageBase;
            GodDealer.Deal(netMsg);
        }
完整的IDealersFactory实现
   
   
   
一个.net客户端通讯框架的设计(四)---协议解码/编码器工厂以及为其他模块提供消息的dealers/trigers一个.net客户端通讯框架的设计(四)---协议解码/编码器工厂以及为其他模块提供消息的dealers/trigersGodDealer
public   class  GodDealer : IDealersFactory
    {
        
private  ConcurrentDictionary < short , ConcurrentBag < IDealer >>  _msgObservers  =   new  ConcurrentDictionary < short ,ConcurrentBag < IDealer >> ();
        
public   void  Deal(NetworkMessageBase netMsg)
        {
            
short  msgId  =  netMsg.MsgId;
            
if  ( ! _msgObservers.ContainsKey(msgId))
            {
                Console.WriteLine(
string .Format( " undeal message id:{0} " , msgId));
                
// System.Diagnostics.Debug.WriteLine(string.Format("undeal message id:{0}", msgId));
                 return ;
            }
            
foreach (var dealer  in  _msgObservers[msgId])
            {
                dealer.deal(netMsg.ContentData);
            }
        }
        
public   void  Register( short  msgId, IDealer dealer)
        {
            
if  ( ! _msgObservers.ContainsKey(msgId))
                _msgObservers[msgId] 
=   new  ConcurrentBag < IDealer > ();
            
if ( ! _msgObservers[msgId].Contains(dealer))
                _msgObservers[msgId].Add(dealer);
        }
        
public   void  UnRegister( short  msgId, IDealer dealer)
        {
            
if  ( ! _msgObservers.ContainsKey(msgId))
                
return ;
            
if (_msgObservers[msgId].Contains(dealer))
            {
                
if  (_msgObservers[msgId].TryTake( out  dealer))
                    
throw   new  InvalidOperationException();
            }
        }
    }

 

代码下载

http://files.cnblogs.com/wJiang/ClientConnLib.rar