IProtocolFactory
编码的工作不应该涉及到业务,也就是说编写业务代码不应该掺杂将对象序列化为byte[]的代码。使用框架的开发人员应该是能够使用.Write(【任意对象类型】obj)而不是只能.Write(【基本数据类型】data)。
所以编码的部分放在IEncoder中解决,为了支持不同类型对象的编码,我定义了一个泛型工厂接口IProtocolFactory。
public interface IProtocolFactory
{
IEncoder < T > GetEncoder < T > ();
IDecoder GetDecoder();
}
下面是发送数据用的Send方法:
public void Send(T msg) public void Send < T > (T msg)
{
var args = _asyncSendArgsPool.Resolve();
byte [] ret = ProtocolFactory.GetEncoder < T > ().Encode(msg);
args.SetBuffer(ret, 0 , ret.Length);
_socket.SendAsync(args);
}
解码器IDecoder的实现
在前几篇篇文章中,我们的通讯模块已经实现了socket连接、消息的发送与消息的解析,不过消息的解析部分是接口调用,并没有给出确切的协议解析方式。协议的解析式完全可以定制的。我们只需实现IDecoder接口即可。
public interface IDecoder
{
DecodeResult Decodable( byte [] data);
void Decode( byte [] data, out int usedSize, out object msg);
}
在前言中,我给出了目前我所使用的协议格式,基于这个协议格式,我们实现IDecoder如下:
DefaultDecodernamespace Sopaco.Labs.ClientProtocolLib.Protocol
{
public class DefaultDecoder : IDecoder
{
#region IDecoder Members
public DecodeResult Decodable( byte [] data)
{
var buffer = FastBuffer.Wrap(data);
buffer.Clear();
if (buffer.Remaining < 4 )
return DecodeResult.NEED_DATA;
int length = buffer.ReadInt32();
if (buffer.Remaining < length - 4 )
return DecodeResult.NEED_DATA;
/* if (buffer.Remaining > length - 4)
return DecodeResult.NOT_OK; */
return DecodeResult.OK;
}
public void Decode( byte [] data, out int usedSize, out object outMsg)
{
var buffer = FastBuffer.Wrap(data);
var netMsg = new NetworkMessageBase();
int length = buffer.ReadInt32();
netMsg.MsgId = buffer.ReadInt16();
netMsg.ContentData = buffer.ReadBytes(length - ShareVars.MESSAGE_HEADER_LENGTH);
outMsg = netMsg;
usedSize = length;
}
#endregion
}
}
看过上一篇那个AsyncSocketConnector的人会知道IDecoder.Decodable方法会在连接器受到消息时被调用,我们回顾一下
当AsyncSocketConnector接收到数据的时候调用的_socketArgs_Received:
private void _socketArgs_Received(object sender, SocketAsyncEventArgs args)private void _socketArgs_Received( object sender, SocketAsyncEventArgs args)
{
var socket = sender as Socket;
if (args.SocketError == SocketError.Success)
{
// check if the buffer is full used(ByteTransfered, OffSet)
var bufferStatus = args.UserToken as BufferStatus;
bufferStatus.Buffer.Append(args.Buffer, args.Offset /* bufferStatus.ReadOffset */ , args.BytesTransferred);
bufferStatus.IncreaseTransfered(args.BytesTransferred);
var decoder = ProtocolFactory.GetDecoder();
var data = bufferStatus.Buffer.copyAvaliableBytes();
var result = decoder.Decodable(data);
switch (result)
{
case DecodeResult.NEED_DATA:
socket.ReceiveAsync(args);
break ;
case DecodeResult.NOT_OK:
throw new BadImageFormatException();
case DecodeResult.OK:
int usedSize;
object netMsg = null ;
decoder.Decode(data, out usedSize, out netMsg);
ThreadPool.QueueUserWorkItem(Handler.MessageReceived, netMsg);
bufferStatus.Buffer.Reset();
bufferStatus.Buffer.Append(data, usedSize, data.Length - usedSize);
socket.ReceiveAsync(args);
break ;
}
}
}
传过来的data数据是本次接收到数据与上一次解析过程中剩余的数据的拼接。我们先使用自己的FastBuffer包装一下byte[]以方便从中读取数据。
之后检查消息是否>4(消息长度为4个字节),只有读出了这4个字节的数据才能知道这条消息到底有多长。之后计算“buffer.Remaining < length - 4”
如果剩余数据没有读完,仍返回NEED_DATA,如果剩余消息长度>所需消息长度,ok,我们能从中获得一条完整的消息。返回OK即可。
之后框架会判断返回结果:
如果是NEED_DATA,它会再次接受消息。如果是OK,调用Decoder.Decode方法,这里有个参数需要注意:”out usedSize”,这是IDecoder中必须设置的一个值,它会告诉连接器你到底读了多少个数据,剩余的数据会被存储以在下一次接受到数据时进行拼接。
为其他模块提供消息处理接口
消息能够解析了,也能够通过”ThreadPool.QueueUserWorkItem(Handler.MessageReceived, netMsg)”这样的方式传递个IoHandler来处理。但是毕竟我们不可能将所有的业务都写在IoHandler的MessageReceived中。好的,我们再设计IDealer和IDealersFactory工厂:
public interface IDealer
{
void deal(byte[] netMsg);
}
interface IDealersFactory
{
void Deal(Sopaco.Labs.ClientConnLib.Message.NetworkMessageBase netMsg);
void Register(short msgId, IDealer dealer);
void UnRegister(short msgId, IDealer dealer);
}
好了,现在外部模块只需调用IDealersFactory.Register即可注册消息的处理。
修改后的MessageReceived方法
public void MessageReceived(object msg)
{
var netMsg = msg as NetworkMessageBase;
GodDealer.Deal(netMsg);
}
完整的IDealersFactory实现
GodDealerpublic class GodDealer : IDealersFactory
{
private ConcurrentDictionary < short , ConcurrentBag < IDealer >> _msgObservers = new ConcurrentDictionary < short ,ConcurrentBag < IDealer >> ();
public void Deal(NetworkMessageBase netMsg)
{
short msgId = netMsg.MsgId;
if ( ! _msgObservers.ContainsKey(msgId))
{
Console.WriteLine( string .Format( " undeal message id:{0} " , msgId));
// System.Diagnostics.Debug.WriteLine(string.Format("undeal message id:{0}", msgId));
return ;
}
foreach (var dealer in _msgObservers[msgId])
{
dealer.deal(netMsg.ContentData);
}
}
public void Register( short msgId, IDealer dealer)
{
if ( ! _msgObservers.ContainsKey(msgId))
_msgObservers[msgId] = new ConcurrentBag < IDealer > ();
if ( ! _msgObservers[msgId].Contains(dealer))
_msgObservers[msgId].Add(dealer);
}
public void UnRegister( short msgId, IDealer dealer)
{
if ( ! _msgObservers.ContainsKey(msgId))
return ;
if (_msgObservers[msgId].Contains(dealer))
{
if (_msgObservers[msgId].TryTake( out dealer))
throw new InvalidOperationException();
}
}
}
代码下载