一个简易socket通信结构

时间:2021-07-18 00:43:19

服务端

工作需要又需要用到socketTCP通讯,这么多年了,终于稍微能写点了。让我说其实也说不出个啥来,看了很多的异步后稍微对异步socket的导流 endreceive后 再beginreceive 形成一个内循环有了个认识,加上我自己的封包拆包机制,然后再仿那些其它的大多数代码结构弄点onReceive事件进行 收包触发。整个过程就算差不多了 ,基本上是能够可靠运行的 靠谱的 中规中矩的,要说啥创新读到嘛真的谈不上。代码中写了很多low逼注释 也是为了方便自己理解 请无视。下面是server端代码,使用异步机制accept 异步receive ,成员有 clients代表当前在线的客户端 客户端socket包装为EndpointClient ,有onClientAddDel 代表客户端上线掉线事件,有onReceive代表所有客户端的收包事件,clients由于是异步的多线程访问就要涉及多线程管控 所以使用lock ,服务端有sendToAll() 和SendToSomeOne()毫无疑问这也是通过调用特定的clients来做的。

以下是服务端代码

1 public class MsgServerSchedule
2 {
3
4
5 Socket serverSocket;
6 public Action<List<string>> onClientAddDel;
7 public Action<Telegram_Base> onReceive;
8 bool _isRunning = false;
9
10
11 int port;
12
13 public TelgramType telType;
14
15 static List<EndpointClient> clients;
16
17 public bool isRunning { get { return _isRunning; } }
18 public MsgServerSchedule(int _port)
19 {
20 //any 就决定了 ip地址格式是v4
21 //IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, 7654);
22 //socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
23
24 this.port = _port;
25
26 clients = new List<EndpointClient>();
27
28 Console.WriteLine("constructor");
29
30 }
31
32 public void Start()
33 {
34 try
35 {
36 IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port);
37 serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
38 serverSocket.Bind(endPoint);
39 serverSocket.Listen(port);
40
41 serverSocket.BeginAccept(new AsyncCallback(AcceptCallback), serverSocket);
42
43 _isRunning = true;
44 Console.WriteLine("start");
45 }
46 catch (Exception ex)
47 {
48 _isRunning = false;
49 serverSocket = null;
50
51 Console.WriteLine("服务启动出现错误,可能端口已被占用:"+port);
52 Console.WriteLine(ex.Message);
53 }
54
55 }
56
57 public void Stop()
58 {
59 for (int i = 0; i < clients.Count; i++)
60 {
61 clients[i].Close();
62 }
63 ClientAddDelGetList(null, EndPointClientsChangeType.ClearAll);
64 serverSocket.Close();
65 _isRunning = false;
66 }
67
68 public void SendToAll(Telegram_Base tel)
69 {
70 for (int i = 0; i < clients.Count; i++)
71 {
72 clients[i].Send(tel);
73 }
74 }
75
76 public void SendToSomeOne(Telegram_Base tel)
77 {
78 for (int i = 0; i < clients.Count; i++)
79 {
80 if(clients[i].remoteIPPort==tel.remoteIPPort)
81 {
82 clients[i].Send(tel);
83 break;
84 }
85 }
86 }
87
88 //新增与删除客户端 秉持原子操作
89 List<string> ClientAddDelGetList(EndpointClient cli, EndPointClientsChangeType changeType)
90 {
91 //异步同时有新客户端上线 与下线 不进行资源互斥访问会报错
92 lock (this)
93 {
94 if (changeType == EndPointClientsChangeType.Add)
95 {
96 clients.Add(cli);
97 }
98 else if(changeType== EndPointClientsChangeType.Del)
99 {
100 var beRemoveClient = clients.First(r => r.remoteIPPort == cli.remoteIPPort);
101 if (beRemoveClient != null)
102 clients.Remove(beRemoveClient);
103 }
104 else if(changeType== EndPointClientsChangeType.ClearAll)
105 {
106 clients.Clear();
107 }
108 else if (changeType == EndPointClientsChangeType.GetAll)
109 {
110 List<string> onLines = new List<string>(clients.Count);
111 for (int i = 0; i < clients.Count; i++)
112 {
113 onLines.Add(clients[i].remoteIPPort);
114 }
115 return onLines;
116 }
117 else
118 {
119 return null;
120 }
121 }
122 return null;
123 }
124 //异步监听客户端 有客户端到来时的回调
125 private void AcceptCallback(IAsyncResult iar)
126 {
127 //server端一直在receive 能够感知到客户端掉线 (连上后 关闭客户端 server立即有错误爆出)
128 //但是同情况 关闭server端 客户端无错误爆出 直到点发送 才有错误爆出
129 //由此得出 处于receive才会有掉线感知 ,send时发现发不出去自然也会有感知 跟人的正常思维理解是一样的
130 //虽然tcp是所谓的长连接 ,通过反复测试 ->但是双方相互都处在一个静止状态 是无法 确定在不在的
131 //连上后平常的情况下 并没有数据流通 的 ,双方只是一个状态的保持而已。
132 //这也是为什么 好多服务 客户端 程序 都有个心跳机制(由此我们可以想到继续完善 弄个客户端列表 心跳超时的剔除列表 正常发消息send 或receive 异常的剔除列表 删除clientSocket
133 //其实非要说吧 一般情况 服务端一直在receive 不用心跳其实也是可以的(客户端可能是真的需要
134 //tcp底层就已经有了一个判断对方在不在的机制 , 对方直接关程序 结束进程 这些 只要tcp在receive就立即能够感知 所以说心跳 用不用看情况吧
135
136 //tcp 不会丢包 哪怕是连接建立了 你还没开始receive 对方却先发了,
137 //对方只要是发了的数据 都由操作系统像个缓存样给你放那的 不会掉 你再隔10秒开始receive都能rec的到
138
139 //tcp甚至在拔掉网线 再重新插上 都可以保证数据一致性
140 //tcp的包顺序能够保证 先发的先到
141
142 //nures代码中那些beginreceivexxx 异步receive的核心机制就是 ,假定数据到的时候把数据保存到xxx数组
143 //真正endreceive的时候 其实数据已经接收 处理完成了
144
145 try
146 {
147
148 //处理完当前accept
149 Socket currentSocket = serverSocket.EndAccept(iar);
150
151 EndpointClient client = new EndpointClient(currentSocket,telType);
152
153 //新增客户端
154 ClientAddDelGetList(client, EndPointClientsChangeType.Add);
155
156 if (onClientAddDel != null)
157 {
158 List<string> onlines = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll);
159 onClientAddDel(onlines);
160
161 //客户端异常掉线
162 client.onClientDel = new Action<string>((_remoteIPPort) =>
163 {
164 ClientAddDelGetList(new EndpointClient(){ remoteIPPort=_remoteIPPort} , EndPointClientsChangeType.Del);
165
166 List<string> onlines2 = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll);
167 onClientAddDel(onlines2);
168 });
169 }
170
171
172
173 //这句到时调用完成后 就会自动把 receivebuffer填充 //要接收的字节数 系统底层操作一次接收多少字节 其实意义不大
174 //总是从0开始(就是说并发时会覆盖
175
176 Console.WriteLine(string.Format("new client ->{0}", currentSocket.RemoteEndPoint.ToString()));
177
178 //currentSocket.Close();
179 //Application.Exit();
180
181 //Thread.Sleep(1000 * 10);
182 client.onReceive += this.onReceive;
183
184 client.BeginReceive();
185
186
187 //立即开始accept新的客户端
188 if (isRunning == true && serverSocket != null)
189 serverSocket.BeginAccept(AcceptCallback, serverSocket);
190 //beginAccept 最开始的方法可以不一样 ,但最终肯定是一个不断accept的闭环过程
191 //整个过程就像个导流样 ,最开始用异步导流到一个固定的点 然后让其循环源源不断运转
192
193 //加asynccallback 有什么不一样么
194 //socket.BeginAccept(new AsyncCallback( AcceptCallback), socket);
195
196 }
197 catch (Exception ex)
198 {
199 Console.WriteLine("AcceptCallback Error");
200 Console.WriteLine(ex.Message);
201 }
202
203 }
204
205
206 }

EndpointClient终端代码代表客户端的对口人,他的onReceive 等资源从服务端继承 ,如果服务端想给某个特定客户端发数据则会调用他们中的某一个 毫无疑问这是通过remoteIPport来判断的,这些都是编写基本socket结构轻车熟路的老套路

以下EndpointClient代码

1 public class EndpointClient
2 {
3 Socket workingSocket;
4 static int receiveBufferLenMax = 5000;
5 byte[] onceReadDatas = new byte[receiveBufferLenMax];
6 List<byte> receiveBuffer = new List< byte>(receiveBufferLenMax);
7
8 public string remoteIPPort { get; set; }
9
10 //当前残留数据区 接收数据的起始指针(也代表缓冲区数据长度
11 int receiveBufferLen = 0;
12
13
14 TelgramType telType;
15
16 public Action<Telegram_Base> onReceive;
17 public Action<string> onClientDel;
18
19 public EndpointClient()
20 {
21
22 }
23 public EndpointClient(Socket _socket,TelgramType _telType)
24 {
25 this.remoteIPPort = _socket.RemoteEndPoint.ToString();
26 this.telType = _telType;
27 workingSocket = _socket;
28 }
29
30 public void Send(Telegram_Base tel)
31 {
32 //try
33 //{
34 if(workingSocket==null)
35 {
36 Console.WriteLine("未初始化的EndpointClient");
37 return;
38 }
39 if (tel is Telegram_Schedule)
40 {
41 Telegram_Schedule telBeSend = tel as Telegram_Schedule;
42 if (telBeSend.dataBytes.Length != telBeSend.dataLen)
43 {
44 Console.WriteLine("尝试发送数据长度格式错误的报文");
45 return;
46 }
47
48 byte[] sendBytesHeader = telBeSend.dataBytesHeader;
49 byte[] sendbytes = telBeSend.dataBytes;
50
51 //数据超过缓冲区长度 会导致无法拆包
52 if (sendbytes.Length <= receiveBufferLenMax)
53 {
54 workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null);
55 workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0,null,null
56
57 );
58 }
59 else
60 {
61 Console.WriteLine("发送到调度客户端的数据超过缓冲区长度");
62 throw new Exception("发送到调度客户端的数据超过缓冲区长度");
63 }
64
65 }
66 else if (tel is Telegram_SDBMsg)
67 {
68
69 }
70
71 //}
72 //catch (Exception ex)
73 //{
74
75 // Console.WriteLine(ex.Message);
76 // throw ex;
77 //}
78 }
79
80 public void BeginReceive()
81 {
82 if (workingSocket == null)
83 {
84 Console.WriteLine("未初始化的EndpointClient");
85 return;
86 }
87
88 receiveBufferLen = 0;
89 workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None,
90 ReceiveCallback,
91 this);
92 }
93 private void ReceiveCallback(IAsyncResult iar)
94 {
95 try
96 {
97 EndpointClient cli = (EndpointClient)iar.AsyncState;
98 Socket socket = cli.workingSocket;
99 int reads = socket.EndReceive(iar);
100
101 if (reads > 0)
102 {
103
104 for (int i = 0; i < reads; i++)
105 {
106 receiveBuffer.Add(onceReadDatas[i]);
107 }
108
109 //具体填充了多少看返回值 此时 数据已经在buffer中了
110 receiveBufferLen += reads;
111 //加完了后解析 阻塞式处理 结束后开启新的接收
112 SloveTelData();
113
114 if (receiveBufferLenMax - receiveBufferLen > 0)
115 {
116 //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收)
117 socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this);
118 }
119 else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了
120 {
121 Close();
122 //移除自己
123 if (onClientDel != null)
124 {
125 onClientDel(remoteIPPort);
126 }
127 Console.WriteLine("服务端接口解析数据出现异常");
128 throw new Exception("服务端接口解析数据出现异常");
129 }
130 }
131 else//reads==0 客户端已关闭
132 {
133 Close();
134 //移除自己
135 if (onClientDel != null)
136 {
137 onClientDel(remoteIPPort);
138 }
139 }
140 }
141 catch (Exception ex)
142 {
143 Close();
144 //移除自己
145 if (onClientDel != null)
146 {
147 onClientDel(remoteIPPort);
148 }
149
150 Console.WriteLine("ReceiveCallback Error");
151 Console.WriteLine(ex.Message);
152 }
153
154 }
155 void SloveTelData()
156 {
157 //进行数据解析
158 SloveTelDataUtil slo = new SloveTelDataUtil();
159
160 if (telType == TelgramType.Schedule)
161 {
162 List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen, this.remoteIPPort);
163 //buffer已经被处理一遍了 使用新的长度
164 receiveBufferLen = receiveBuffer.Count;
165 //解析出的每一个对象都触发 onreceive
166 for (int i = 0; i < dataEntitys.Count; i++)
167 {
168 if (onReceive != null)
169 onReceive(dataEntitys[i]);
170 }
171 }
172 else if (telType == TelgramType.SDBMsg)
173 {
174
175 }
176
177 }
178
179
180 public void Close()
181 {
182 try
183 {
184 receiveBuffer.Clear();
185 receiveBufferLen = 0;
186 if (workingSocket != null && workingSocket.Connected)
187 workingSocket.Close();
188 }
189 catch (Exception ex)
190 {
191 Console.WriteLine(ex.Message);
192 }
193
194 }
195 }

数据拆包与封包粘包处理

上面的代码可以看到 数据包处理都在receiveCallback里 SloveTelData,也是通用的套路 ,解析到完整的包后从缓冲区移除 解析多少个包触发多少次事件,残余数据留在缓冲区 然后继续开始新的beginReceive往缓冲区加。在异步机制中 到达endReceive的时候数据已经在缓冲区里了,这个自不用多说噻。数据包和粘包逻辑在公共类库里供客户端服务端共同调用

以下是粘包处理逻辑

1 public class SloveTelDataUtil
2 {
3 List<Telegram_Schedule> solveList;
4 public SloveTelDataUtil()
5 {
6 }
7
8 List<byte> buffer;
9 int bufferLen;
10 int bufferIndex = 0;
11 string remoteIPPort;
12 public List<Telegram_Schedule> Slove_Telegram_Schedule( List< byte> _buffer,int _bufferLen,string _remoteIPPort)
13 {
14
15 solveList = new List<Telegram_Schedule>();
16
17 bufferIndex = 0;
18
19 buffer = _buffer;
20 bufferLen = _bufferLen;
21 remoteIPPort = _remoteIPPort;
22
23 //小于最小长度 直接返回
24 if (bufferLen < 12)
25 return solveList;
26
27 //进行数据解析
28 bool anaysisOK = false;
29 while (anaysisOK=AnaysisData_Schedule()==true)//直到解析的不能解析为止
30 {
31 }
32 return solveList;
33 }
34
35 public bool AnaysisData_Schedule()
36 {
37 if (bufferLen - bufferIndex < GlobalSymbol.Headerlen)
38 return false;
39
40 //解析出一个数据对象
41 Telegram_Schedule telObj = new Telegram_Schedule();
42
43 //必定是大于最小数据大小的
44 telObj.dataBytesHeader = new byte[GlobalSymbol.Headerlen];
45 buffer.CopyTo(bufferIndex, telObj.dataBytesHeader, 0, GlobalSymbol.Headerlen);
46
47 byte[] btsHeader = new byte[4];
48 byte[] btsCommand = new byte[4];
49 byte[] btsLen = new byte[4];
50
51 btsHeader[0] = buffer[bufferIndex];
52 btsHeader[1] = buffer[bufferIndex+1];
53 btsHeader[2] = buffer[bufferIndex+2];
54 btsHeader[3] = buffer[bufferIndex+3];
55
56 bufferIndex += 4;
57
58 btsCommand[0] = buffer[bufferIndex];
59 btsCommand[1] = buffer[bufferIndex + 1];
60 btsCommand[2] = buffer[bufferIndex + 2];
61 btsCommand[3] = buffer[bufferIndex + 3];
62
63 bufferIndex += 4;
64
65 btsLen[0] = buffer[bufferIndex];
66 btsLen[1] = buffer[bufferIndex + 1];
67 btsLen[2] = buffer[bufferIndex + 2];
68 btsLen[3] = buffer[bufferIndex + 3];
69
70 bufferIndex += 4;
71
72
73
74 int dataLen = BitConverter.ToInt32(btsLen, 0);
75 telObj.header = BitConverter.ToUInt32(btsHeader, 0);
76 telObj.command = BitConverter.ToInt32(btsCommand, 0);
77 telObj.remoteIPPort = remoteIPPort;
78
79 if(dataLen>0)
80 {
81 //数据区小于得到的数据长度 说明数据部分还没接收到 不删除缓冲区 不做任何处理
82 //下次来了连着头一起解析
83 if (bufferLen - GlobalSymbol.Headerlen < dataLen)
84 {
85
86 bufferIndex -= 12;//
87
88
89 return false;
90
91 }
92 else
93 {
94
95 telObj.dataLen = dataLen;
96 telObj.dataBytes = new byte[dataLen];
97 buffer.CopyTo(bufferIndex, telObj.dataBytes, 0, dataLen);
98
99 solveList.Add(telObj);
100 //bufferIndex += dataLen;
101
102 //解析成功一次 移除已解析的
103 for (int i = 0; i < GlobalSymbol.Headerlen+dataLen; i++)
104 {
105 buffer.RemoveAt(0);
106 }
107 bufferIndex = 0;
108 bufferLen = buffer.Count;
109 return true;
110 }
111 }
112 else
113 {
114
115 telObj.dataLen = 0;
116 solveList.Add(telObj);
117 //bufferIndex += 0;
118 //解析成功一次 移除已解析的
119 for (int i = 0; i < GlobalSymbol.Headerlen; i++)
120 {
121 buffer.RemoveAt(0);
122 }
123 //解析成功一次因为移除了缓冲区 bufferIndex置0
124 bufferIndex = 0;
125 bufferLen = buffer.Count;
126 return true;
127 }
128
129 }
130
131
132 public List<Telegram_SDBMsg> Slove_Telegram_SDBMsg(ref byte[] buffer)
133 {
134 return new List<Telegram_SDBMsg>();
135 }
136 }

我们看到用到的数据包对象是Telegram_Schedule ,中间保存有报文数据,数据发送的目标等信息。

以下是数据包结构代码

1 public class Telegram_Base
2 {
3 public string remoteIPPort { get; set; }
4 //数据内容
5 public byte[] dataBytes { get; set; }
6 //头部内容的序列化
7 public byte[] dataBytesHeader { get; set; }
8
9 public string jsonStr { get; set; }
10 virtual public void SerialToBytes()
11 {
12
13 }
14
15 virtual public void SloveToTel()
16 {
17
18 }
19
20 }
21
22 public class Telegram_Schedule:Telegram_Base
23 {
24
25 //头部标识 4字节
26 public UInt32 header { get; set; }
27 //命令对应枚举的 int 4字节
28 public int command { get; set; }
29 //数据长度 4字节
30 public int dataLen { get; set; }
31
32
33
34 override public void SerialToBytes()
35 {
36 //有字符串数据 但是待发送字节是空
37 if ((string.IsNullOrEmpty(jsonStr) == false ))//&& (dataBytes==null || dataBytes.Length==0)
38 {
39 dataBytes = Encoding.UTF8.GetBytes(jsonStr);
40 dataLen = dataBytes.Length;
41 dataBytesHeader = new byte[GlobalSymbol.Headerlen];
42
43 header = GlobalSymbol.HeaderSymbol;
44
45 byte[] btsHeader = BitConverter.GetBytes(header);
46 byte[] btsCommand = BitConverter.GetBytes(command);
47 byte[] btsLen = BitConverter.GetBytes(dataLen);
48
49 Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4);
50 Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4);
51 Array.Copy(btsLen, 0, dataBytesHeader, 8, 4);
52
53 }
54 else if((string.IsNullOrEmpty(jsonStr) == true )&& (dataBytes==null || dataBytes.Length==0)){
55 dataLen = 0;
56 dataBytes = new byte[0];
57
58 dataBytesHeader = new byte[GlobalSymbol.Headerlen];
59
60 header = GlobalSymbol.HeaderSymbol;
61
62 byte[] btsHeader = BitConverter.GetBytes(header);
63 byte[] btsCommand = BitConverter.GetBytes(command);
64 byte[] btsLen = BitConverter.GetBytes(dataLen);
65
66 Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4);
67 Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4);
68 Array.Copy(btsLen, 0, dataBytesHeader, 8, 4);
69 }
70 }
71
72 override public void SloveToTel()
73 {
74 //只解析字符串数据部分 ,header 和len 在接收之初就已解析
75 this.jsonStr = Encoding.UTF8.GetString(this.dataBytes);
76 }
77
78 }

客户端代码

最后是客户端,有了上面的结构,客户端就不足为谈了,稍微了解socket的人都熟知套路的 基本跟EndpointClient一致

1 public class MsgClientSchedule
2 {
3 Socket workingSocket;
4 //缓冲区最大数据长度
5 static int receiveBufferLenMax = 5000;
6 //单次receive数据(取决于tcp底层封包 但是不会超过缓冲区最大长度
7 byte[] onceReadDatas = new byte[receiveBufferLenMax];
8 //未解析到完整数据包时的残余数据保存区
9 List<byte> receiveBuffer = new List<byte>(receiveBufferLenMax);
10
11 string serverIP { get; set; }
12 int serverPort { get; set; }
13 public string localIPPort { get; set; }
14
15 //残余缓冲区数据长度
16 int receiveBufferLen = 0;
17
18 bool _isConnected { get; set; }
19
20 TelgramType telType;
21
22 //收一个包时触发
23 public Action<Telegram_Base> onReceive;
24 //与服务端断链时触发
25 public Action<string> onClientDel;
26
27
28 public bool isConnected { get { return _isConnected; } }
29 public MsgClientSchedule(string _serverIP,int _port)
30 {
31 serverIP = _serverIP;
32 serverPort = _port;
33 _isConnected = false;
34 }
35
36 public void Connect()
37 {
38 try
39 {
40 workingSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.IP);
41 IPEndPoint ipport = new IPEndPoint(IPAddress.Parse(serverIP), serverPort);
42 workingSocket.Connect(ipport);
43
44 localIPPort = workingSocket.LocalEndPoint.ToString();
45 _isConnected = true;
46 BeginReceive();
47 }
48 catch (Exception ex)
49 {
50 workingSocket = null;
51 _isConnected = false;
52
53 Console.WriteLine(ex.Message);
54 }
55
56 }
57
58
59
60
61 public void Send(Telegram_Base tel)
62 {
63 try
64 {
65 if(_isConnected==false)
66 {
67 Console.WriteLine("未连接到服务器");
68 return;
69 }
70 if (tel is Telegram_Schedule)
71 {
72 Telegram_Schedule telBeSend = tel as Telegram_Schedule;
73 if (telBeSend.dataBytes.Length != telBeSend.dataLen)
74 {
75 Console.WriteLine("尝试发送数据长度格式错误的报文");
76 return;
77 }
78 byte[] sendBytesHeader = telBeSend.dataBytesHeader;
79 byte[] sendbytes = telBeSend.dataBytes;
80
81 //数据超过缓冲区长度 会导致无法拆包
82 if (sendbytes.Length <= receiveBufferLenMax)
83 {
84 workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null);
85 workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0, null, null
86
87 );
88 }
89 else
90 {
91 Console.WriteLine("发送到调度客户端的数据超过缓冲区长度");
92 throw new Exception("发送到调度客户端的数据超过缓冲区长度");
93 }
94
95
96 }
97 else if (tel is Telegram_SDBMsg)
98 {
99
100 }
101
102 }
103 catch (Exception ex)
104 {
105 Close();
106 Console.WriteLine(ex.Message);
107 //throw ex;
108 }
109 }
110
111 public void BeginReceive()
112 {
113 receiveBufferLen = 0;
114 workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None,
115 ReceiveCallback,
116
117 this);
118 }
119 private void ReceiveCallback(IAsyncResult iar)
120 {
121 try
122 {
123 MsgClientSchedule cli = (MsgClientSchedule)iar.AsyncState;
124 Socket socket = cli.workingSocket;
125 int reads = socket.EndReceive(iar);
126
127 if (reads > 0)
128 {
129
130 for (int i = 0; i < reads; i++)
131 {
132 receiveBuffer.Add(onceReadDatas[i]);
133 }
134
135 //具体填充了多少看返回值 此时 数据已经在buffer中了
136
137 receiveBufferLen += reads;
138
139 //加完了后解析 阻塞式处理 结束后开启新的接收
140 SloveTelData();
141
142
143
144 if (receiveBufferLenMax - receiveBufferLen > 0)
145 {
146 //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收)
147 socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this);
148 }
149 else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了
150 {
151 Close();
152
153 Console.WriteLine("服务端接口解析数据出现异常");
154 throw new Exception("服务端接口解析数据出现异常");
155 }
156 }
157 else//reads==0客户端已关闭
158 {
159 Close();
160 }
161 }
162 catch (Exception ex)
163 {
164 Close();
165
166 Console.WriteLine("ReceiveCallback Error");
167 Console.WriteLine(ex.Message);
168 }
169
170 }
171 private void SloveTelData()
172 {
173
174 //进行数据解析
175 SloveTelDataUtil slo = new SloveTelDataUtil();
176
177 if (telType == TelgramType.Schedule)
178 {
179 List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen,serverIP+":"+serverPort.ToString());
180 //buffer已经被处理一遍了 使用新的长度
181 receiveBufferLen = receiveBuffer.Count;
182 //解析出的每一个对象都触发 onreceive
183 for (int i = 0; i < dataEntitys.Count; i++)
184 {
185 if (onReceive != null)
186 onReceive(dataEntitys[i]);
187 }
188 }
189 else if (telType == TelgramType.SDBMsg)
190 {
191
192 }
193
194 }
195
196
197 public void Close()
198 {
199 try
200 {
201 _isConnected = false;
202
203 receiveBuffer.Clear();
204 receiveBufferLen = 0;
205 if (workingSocket != null && workingSocket.Connected)
206 workingSocket.Close();
207 }
208 catch (Exception ex)
209 {
210 Console.WriteLine(ex.Message);
211 }
212
213 }
214
215 }

服务端调用

构建一个winform基本项目

1 List<string> clients;
2 MsgServerSchedule server;
3 private void btn_start_Click(object sender, EventArgs e)
4 {
5 server = new MsgServerSchedule(int.Parse(tbx_port.Text));
6
7
8 server.Start();
9 if (server.isRunning == true)
10 {
11 btn_start.Enabled = false;
12
13 server.onReceive += new Action<Telegram_Base>(
14 (tel) =>
15 {
16 this.BeginInvoke(new Action(() =>
17 {
18 if (tel is Telegram_Schedule)
19 {
20 Telegram_Schedule ts = tel as Telegram_Schedule;
21 ts.SloveToTel();
22 Console.WriteLine(string.Format("commandType:{0}", ((ScheduleTelCommandType)ts.command).ToString()));
23
24 tbx_msgs.Text += ts.remoteIPPort + ">" + ts.jsonStr + "\r\n";
25
26 //数据回发测试
27 string fromip = ts.remoteIPPort;
28 string srcMsg = ts.jsonStr;
29 string fromServerMsg = ts.jsonStr + " -from server";
30 ts.jsonStr = fromServerMsg;
31
32
33 //如果消息里有指向信息 则转送到对应的客户端
34 if (clients != null)
35 {
36 string to = null;
37 for (int i = 0; i < clients.Count; i++)
38 {
39 if (srcMsg.Contains(clients[i]))
40 {
41 to = clients[i];
42 break;
43 }
44 }
45
46 if (to != null)
47 {
48 ts.remoteIPPort = to;
49 string toMsg;
50 //toMsg= srcMsg.Replace(to, "");
51 toMsg = srcMsg.Replace(to, fromip);
52 ts.jsonStr = toMsg;
53 ts.SerialToBytes();
54
55 server.SendToSomeOne(ts);
56 }
57 else
58 {
59 ts.SerialToBytes();
60 server.SendToSomeOne(ts);
61 }
62 }
63 }
64 }));
65
66 }
67 );
68
69 server.onClientAddDel += new Action<List<string>>((onlines) =>
70 {
71 this.BeginInvoke(
72 new Action(() =>
73 {
74 clients = onlines;
75 listbox_clients.Items.Clear();
76
77 for (int i = 0; i < onlines.Count; i++)
78 {
79 listbox_clients.Items.Add(onlines[i]);
80 }
81 }));
82 });
83 }
84 }
85 private void btn_sendAll_Click(object sender, EventArgs e)
86 {
87 Telegram_Schedule tel = new Telegram_Schedule();
88 tel.header = GlobalSymbol.HeaderSymbol;
89 tel.command = (int)ScheduleTelCommandType.StartC2S;
90 tel.jsonStr = tbx_sendAll.Text;
91 tel.SerialToBytes();
92
93 server.SendToAll(tel);
94 }

客户端调用

1 MsgClientSchedule client;
2
3 private void btn_start_Click(object sender, EventArgs e)
4 {
5 client = new MsgClientSchedule(tbx_ip.Text, int.Parse(tbx_port.Text));
6
7 client.Connect();
8
9 if (client.isConnected == true)
10 {
11 btn_start.Enabled = false;
12
13 label1.Text = client.localIPPort;
14
15 client.onReceive = new Action<Telegram_Base>((tel) =>
16 {
17 this.BeginInvoke(
18 new Action(() =>
19 {
20 tel.SloveToTel();
21 tbx_rec.Text += tel.jsonStr + "\r\n";
22
23 }));
24 });
25 }
26
27 }
28
29
30
31 private void btn_send_Click(object sender, EventArgs e)
32 {
33
34 if (client == null || client.isConnected == false)
35 return;
36
37 //for (int i = 0; i < 2; i++)
38 //{
39 Telegram_Schedule tel = new Telegram_Schedule();
40 tel.command = (int)ScheduleTelCommandType.MsgC2S;
41
42 tel.jsonStr = tbx_remoteip.Text+">"+ tbx_msgSend.Text;
43 tel.SerialToBytes();//发出前要先序列化
44
45 client.Send(tel);
46 //}
47
48 }

实现效果

可以多客户端连接互相*发送消息,服务端可以编写转发规则代码,那些什么棋牌啊 互动白板 以及其他类似的应用就可以基于此之上发挥想象了

一个简易socket通信结构