之前的文章讲述了socket通信的一些基本知识,已经本人自定义的C#版本的socket、和java netty 库的二次封装,但是没有真正的发表测试用例。
本文只是为了讲解利用protobuf 进行C# 和 java的通信。以及完整的实例代码
java 代码 svn 地址,本人开发工具是NetBeans 8.0.2 使用 maven 项目编译
http://code.taobao.org/svn/flynetwork_csharp/trunk/BlogTest
c# 代码 svn 地址 使用的是 vs 2013 .net 4.5
http://code.taobao.org/svn/flynetwork_csharp/trunk/Flynetwork/BlogTest
编译工具下载
http://files.cnblogs.com/files/ty408/Sz.ExcelTools.zip
本文着重以C# socket作为服务器端,java netty作为socket的客户端进行访问通信
首先附上proto的message文件
package Sz.Test.ProtoMessage; //登陆消息 message TestMessage { //消息枚举 enum Proto_Login { ResTip = 101201;//服务器推送提示 ReqLogin = 101102;//客户端申请登陆 ReqChat = 101103;//客户端申请聊天消息 ResChat = 101203;//服务器推送聊天消息 } //服务器推送提示 ResTip message ResTipMessage { required string msg = 1;//提示内容 } //客户端申请登陆 ReqLogin message ReqLoginMessage { required string userName = 1;//登陆用户名 required string userPwd = 2;//登陆密码 } //客户端申请登陆 ReqChat message ReqChatMessage { required string msg = 1;//提示内容 } //客户端申请登陆 ResChat message ResChatMessage { required string msg = 1;//提示内容 } }
本人编译工具自带生产消息,和对应的handler
先把proto文件编译生产后,放到哪里,然后创建服务器监听代码
上一篇文章讲到由于java和C#默认网络端绪不一样,java是标准端绪大端序,C#使用的小端序。
MarshalEndian.JN = MarshalEndian.JavaOrNet.Java; Sz.Network.SocketPool.ListenersBox.Instance.SetParams(new MessagePool(), typeof(MarshalEndian)); Sz.Network.SocketPool.ListenersBox.Instance.Start("tcp:*:9527");
所以在我开启服务器监听的时候设置解码器和编码器的解析风格为java
然后建立一个文件chat文件夹用于存放handler文件就是刚才工具生成 目录下的 ExcelSource\protobuf\net\Handler
这一系列文件
if (message.MsgID == (int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ReqLogin) { //构建消息 Sz.Test.ProtoMessage.TestMessage.ReqLoginMessage loginmessage = new Test.ProtoMessage.TestMessage.ReqLoginMessage(); object msg = DeSerialize(message.MsgBuffer, loginmessage); //构建handler Test.ProtoMessage.ReqLoginHandler handler = new Test.ProtoMessage.ReqLoginHandler(); handler.Session = client; handler.Message = loginmessage; //把handler交给 登录 线程处理 ThreadManager.Instance.AddTask(ServerManager.LoginThreadID, handler); } else if (message.MsgID == (int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ReqChat) { //构建消息 Sz.Test.ProtoMessage.TestMessage.ReqChatMessage loginmessage = new Test.ProtoMessage.TestMessage.ReqChatMessage(); object msg = DeSerialize(message.MsgBuffer, loginmessage); //构建handler Test.ProtoMessage.ReqChatHandler handler = new Test.ProtoMessage.ReqChatHandler(); handler.Session = client; handler.Message = loginmessage; //把handler交给 聊天 线程处理 ThreadManager.Instance.AddTask(ServerManager.ChatThreadID, handler); }
收到消息后的处理判断传入的消息id是什么类型的,然后对应反序列化byte[]数组为消息
最后把消息和生成handler移交到对应的线程处理
登录的消息全部交给 LoginThread 线程 去处理 ,这样在真实的运行环境下,能保证单点登录问题;
聊天消息全部交给 ChatThread 线程 去处理 这样的好处是,聊天与登录无关;
收到登录消息的处理
public class ReqLoginHandler : TcpHandler { public override void Run() { var message = (Sz.Test.ProtoMessage.TestMessage.ReqLoginMessage)this.Message; Sz.Test.ProtoMessage.TestMessage.ResTipMessage tip = new TestMessage.ResTipMessage(); if (message.userName == "admin" && message.userPwd == "admin") { Logger.Debug("收到登录消息 登录完成"); tip.msg = "登录完成"; } else { Logger.Debug("收到登录消息 用户名或者密码错误"); tip.msg = "用户名或者密码错误"; } byte[] buffer = MessagePool.Serialize(tip); this.Session.SendMsg(new Network.SocketPool.SocketMessage((int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ResTip, buffer)); } }
收到聊天消息的处理
public class ReqChatHandler : TcpHandler { public override void Run() { var message = (Sz.Test.ProtoMessage.TestMessage.ReqChatMessage)this.Message; Logger.Debug("收到来自客户端聊天消息:" + message.msg); Sz.Test.ProtoMessage.TestMessage.ResChatMessage chat = new TestMessage.ResChatMessage(); chat.msg = "服务器广播:" + message.msg; byte[] buffer = MessagePool.Serialize(chat); this.Session.SendMsg(new Network.SocketPool.SocketMessage((int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ResChat, buffer)); } }
接下来我们构建
java版本基于netty 二次封装的socket客户端
package sz.network.socketpool.nettypool; import Sz.Test.ProtoMessage.Test.TestMessage; import com.google.protobuf.InvalidProtocolBufferException; import io.netty.channel.ChannelHandlerContext; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.logging.Level; import org.apache.log4j.Logger; /** * * @author Administrator */ public class TestClient { static final Logger log = Logger.getLogger(TestClient.class); static NettyTcpClient client = null; public static void main(String[] args) { client = new NettyTcpClient("127.0.0.1", 9527, true, new NettyMessageHandler() { @Override public void channelActive(ChannelHandlerContext session) { log.info("连接服务器成功:"); //构建错误的登录消息 TestMessage.ReqLoginMessage.Builder newBuilder = TestMessage.ReqLoginMessage.newBuilder(); newBuilder.setUserName("a"); newBuilder.setUserPwd("a"); //发送消息 TestClient.client.sendMsg(new NettyMessageBean(TestMessage.Proto_Login.ReqLogin_VALUE, newBuilder.build().toByteArray())); //构建正确的登录消息 TestMessage.ReqLoginMessage.Builder newBuilder1 = TestMessage.ReqLoginMessage.newBuilder(); newBuilder1.setUserName("admin"); newBuilder1.setUserPwd("admin"); TestClient.client.sendMsg(new NettyMessageBean(TestMessage.Proto_Login.ReqLogin_VALUE, newBuilder1.build().toByteArray())); } @Override public void readMessage(NettyMessageBean msg) { try { if (msg.getMsgid() == TestMessage.Proto_Login.ResTip_VALUE) { TestMessage.ResTipMessage tipmessage = TestMessage.ResTipMessage.parseFrom(msg.getMsgbuffer()); log.info("收到提示信息:" + tipmessage.getMsg()); } else if (msg.getMsgid() == TestMessage.Proto_Login.ResChat_VALUE) { TestMessage.ResChatMessage tipmessage = TestMessage.ResChatMessage.parseFrom(msg.getMsgbuffer()); log.info("收到聊天消息:" + tipmessage.getMsg()); } } catch (InvalidProtocolBufferException ex) { log.error("收到消息:" + msg.getMsgid() + " 解析出错:" + ex); } } @Override public void closeSession(ChannelHandlerContext session) { log.info("连接关闭或者连接不成功:"); } @Override public void exceptionCaught(ChannelHandlerContext session, Throwable cause) { log.info("错误:" + cause.toString()); } }); client.Connect(); BufferedReader strin = new BufferedReader(new InputStreamReader(System.in)); while (true) { try { String str = strin.readLine(); //构建聊天消息 TestMessage.ReqChatMessage.Builder chatmessage = TestMessage.ReqChatMessage.newBuilder(); chatmessage.setMsg(str); TestClient.client.sendMsg(new NettyMessageBean(TestMessage.Proto_Login.ReqChat_VALUE, chatmessage.build().toByteArray())); } catch (IOException ex) { } } } }
接下来我们看看效果
我设置了断线重连功能,我们来测试一下,把服务器关闭
可以看到没3秒向服务器发起一次请求;
知道服务器再次开启链接成功
完整的通信示例演示就完了;
代码我不在上传了,请各位使用svn下载好么????
需要注意的是,消息的解码器和编码器,一定要双方都遵守你自己的契约。比如我在编码消息格式的时候先写入消息包的长度,然后跟上消息的id,再是消息的内容
所以解码的时候,先读取一个消息长度,在读取一个消息id,如果本次收到的消息字节数不够长度那么留存起来以用于下一次收到字节数组追加后再一起解析。
这样就能解决粘包的问题。
附上C#版本的解析器
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; /** * * @author 失足程序员 * @Blog http://www.cnblogs.com/ty408/ * @mail 492794628@qq.com * @phone 13882122019 * */ namespace Sz.Network.SocketPool { public class MarshalEndian : IMarshalEndian { public enum JavaOrNet { Java, Net, } public MarshalEndian() { } public static JavaOrNet JN = JavaOrNet.Net; /// <summary> /// 读取大端序的int /// </summary> /// <param name="value"></param> public int ReadInt(byte[] intbytes) { Array.Reverse(intbytes); ); } /// <summary> /// 写入大端序的int /// </summary> /// <param name="value"></param> public byte[] WriterInt(int value) { byte[] bs = BitConverter.GetBytes(value); Array.Reverse(bs); return bs; } //用于存储剩余未解析的字节数 ); //字节数常量一个消息id4个字节 const long ConstLenght = 4L; public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool flag1) { if (flag1) { IDisposable disposable = this._LBuff as IDisposable; if (disposable != null) { disposable.Dispose(); } } } public byte[] Encoder(SocketMessage msg) { MemoryStream ms = new MemoryStream(); BinaryWriter bw = new BinaryWriter(ms, UTF8Encoding.Default); byte[] msgBuffer = msg.MsgBuffer; if (msgBuffer != null) { switch (JN) { case JavaOrNet.Java: bw.Write(WriterInt(msgBuffer.Length + )); bw.Write(WriterInt(msg.MsgID)); break; case JavaOrNet.Net: bw.Write((Int32)(msgBuffer.Length + )); bw.Write(msg.MsgID); break; } bw.Write(msgBuffer); } else { switch (JN) { case JavaOrNet.Java: bw.Write(WriterInt()); break; case JavaOrNet.Net: bw.Write((Int32)); break; } } bw.Close(); ms.Close(); bw.Dispose(); ms.Dispose(); return ms.ToArray(); } public List<SocketMessage> Decoder(byte[] buff, int len) { //拷贝本次的有效字节 byte[] _b = new byte[len]; Array.Copy(buff, , _b, , _b.Length); buff = _b; ) { //拷贝之前遗留的字节 this._LBuff.AddRange(_b); buff = this._LBuff.ToArray(); this._LBuff.Clear(); ); } List<SocketMessage> list = new List<SocketMessage>(); MemoryStream ms = new MemoryStream(buff); BinaryReader buffers = new BinaryReader(ms, UTF8Encoding.Default); try { byte[] _buff; Label_0073: //判断本次解析的字节是否满足常量字节数 if ((buffers.BaseStream.Length - buffers.BaseStream.Position) < ConstLenght) { _buff = buffers.ReadBytes((int)(buffers.BaseStream.Length - buffers.BaseStream.Position)); this._LBuff.AddRange(_buff); } else { ; switch (JN) { case JavaOrNet.Java: offset = ReadInt(buffers.ReadBytes()); break; case JavaOrNet.Net: offset = buffers.ReadInt32(); break; } //剩余字节数大于本次需要读取的字节数 if (offset <= (buffers.BaseStream.Length - buffers.BaseStream.Position)) { ; switch (JN) { case JavaOrNet.Java: msgID = ReadInt(buffers.ReadBytes()); break; case JavaOrNet.Net: msgID = buffers.ReadInt32(); break; } _buff = buffers.ReadBytes(()); list.Add(new SocketMessage(msgID, _buff)); goto Label_0073; } else { //剩余字节数刚好小于本次读取的字节数 存起来,等待接受剩余字节数一起解析 buffers.BaseStream.Seek(ConstLenght, SeekOrigin.Current); _buff = buffers.ReadBytes((int)(buffers.BaseStream.Length - buffers.BaseStream.Position)); this._LBuff.AddRange(_buff); } } } catch { } finally { buffers.Close(); if (buffers != null) { buffers.Dispose(); } ms.Close(); if (ms != null) { ms.Dispose(); } } return list; } } }
谢谢观赏~!