在分布式调度系统中,如果要实现调度服务器与多台计算节点服务器之间通信,采用socket来实现是一种实现方式,当然我们也可以通过数据存储任务,子节点来完成任务,但是往往使用数据作为任务存储都需要定制开发,要维护数据库中任务记录状态等等。开发的东西还是有点多,而且还不够灵活。因此,我个人是比较偏向于使用socket来实现任务的调度工作。原因:使用socket实现调度比较灵活,而且扩展性都比较好。
实现思路:调度服务器要实现调度工作,它必须与所有计算节点之间建立连接。而且他需要知道每台计算节点的任务状况,因此服务器节点必须存储与所有计算节点的socket连接对象。
在客户端唯一需要知道的就是它归属的调度服务器的通信IP和端口,因此client是发送连接的主动方,由调度服务器监听是否有client请求建立连接,当建立连接成功后,把该连接信息存储到一个结合中以便监控client的存货状态及通信使用。
扩展:
由于server端是存储了所有server与client的连接对象,因此我们是可以基于此demo的基础上实现聊天系统:
* 每当一个与用户发言时,是由server接收到的某个用户的发言信息的,此时服务器端可以通过循环发送该用户发送的信息给每个已经连接连接的用户(排除发送者)。
Server端代码(Window Console Project):
1 using System;View Code
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Net.Sockets;
7 using System.Net;
8
9 namespace SocketServerAcceptMultipleClient
10 {
11 public class SocketServer
12 {
13 // 创建一个和客户端通信的套接字
14 static Socket socketwatch = null;
15 //定义一个集合,存储客户端信息
16 static Dictionary<string, Socket> clientConnectionItems = new Dictionary<string, Socket> { };
17
18 public static void Main(string[] args)
19 {
20 //定义一个套接字用于监听客户端发来的消息,包含三个参数(IP4寻址协议,流式连接,Tcp协议)
21 socketwatch = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
22 //服务端发送信息需要一个IP地址和端口号
23 IPAddress address = IPAddress.Parse("127.0.0.1");
24 //将IP地址和端口号绑定到网络节点point上
25 IPEndPoint point = new IPEndPoint(address, 8098);
26 //此端口专门用来监听的
27
28 //监听绑定的网络节点
29 socketwatch.Bind(point);
30
31 //将套接字的监听队列长度限制为20
32 socketwatch.Listen(20);
33
34 //负责监听客户端的线程:创建一个监听线程
35 Thread threadwatch = new Thread(watchconnecting);
36
37 //将窗体线程设置为与后台同步,随着主线程结束而结束
38 threadwatch.IsBackground = true;
39
40 //启动线程
41 threadwatch.Start();
42
43 Console.WriteLine("开启监听。。。");
44 Console.WriteLine("点击输入任意数据回车退出程序。。。");
45 Console.ReadKey();
46 Console.WriteLine("退出监听,并关闭程序。");
47 }
48
49 //监听客户端发来的请求
50 static void watchconnecting()
51 {
52 Socket connection = null;
53
54 //持续不断监听客户端发来的请求
55 while (true)
56 {
57 try
58 {
59 connection = socketwatch.Accept();
60 }
61 catch (Exception ex)
62 {
63 //提示套接字监听异常
64 Console.WriteLine(ex.Message);
65 break;
66 }
67
68 //获取客户端的IP和端口号
69 IPAddress clientIP = (connection.RemoteEndPoint as IPEndPoint).Address;
70 int clientPort = (connection.RemoteEndPoint as IPEndPoint).Port;
71
72 //让客户显示"连接成功的"的信息
73 string sendmsg = "连接服务端成功!\r\n" + "本地IP:" + clientIP + ",本地端口" + clientPort.ToString();
74 byte[] arrSendMsg = Encoding.UTF8.GetBytes(sendmsg);
75 connection.Send(arrSendMsg);
76
77 //客户端网络结点号
78 string remoteEndPoint = connection.RemoteEndPoint.ToString();
79 //显示与客户端连接情况
80 Console.WriteLine("成功与" + remoteEndPoint + "客户端建立连接!\t\n");
81 //添加客户端信息
82 clientConnectionItems.Add(remoteEndPoint, connection);
83
84 //IPEndPoint netpoint = new IPEndPoint(clientIP,clientPort);
85 IPEndPoint netpoint = connection.RemoteEndPoint as IPEndPoint;
86
87 //创建一个通信线程
88 ParameterizedThreadStart pts = new ParameterizedThreadStart(recv);
89 Thread thread = new Thread(pts);
90 //设置为后台线程,随着主线程退出而退出
91 thread.IsBackground = true;
92 //启动线程
93 thread.Start(connection);
94 }
95 }
96
97 /// <summary>
98 /// 接收客户端发来的信息,客户端套接字对象
99 /// </summary>
100 /// <param name="socketclientpara"></param>
101 static void recv(object socketclientpara)
102 {
103 Socket socketServer = socketclientpara as Socket;
104
105 while (true)
106 {
107 //创建一个内存缓冲区,其大小为1024*1024字节 即1M
108 byte[] arrServerRecMsg = new byte[1024 * 1024];
109 //将接收到的信息存入到内存缓冲区,并返回其字节数组的长度
110 try
111 {
112 int length = socketServer.Receive(arrServerRecMsg);
113
114 //将机器接受到的字节数组转换为人可以读懂的字符串
115 string strSRecMsg = Encoding.UTF8.GetString(arrServerRecMsg, 0, length);
116
117 //将发送的字符串信息附加到文本框txtMsg上
118 Console.WriteLine("客户端:" + socketServer.RemoteEndPoint + ",time:" + GetCurrentTime() + "\r\n" + strSRecMsg + "\r\n\n");
119
120 socketServer.Send(Encoding.UTF8.GetBytes("测试server 是否可以发送数据给client "));
121 }
122 catch (Exception ex)
123 {
124 clientConnectionItems.Remove(socketServer.RemoteEndPoint.ToString());
125
126 Console.WriteLine("Client Count:" + clientConnectionItems.Count);
127
128 //提示套接字监听异常
129 Console.WriteLine("客户端" + socketServer.RemoteEndPoint + "已经中断连接" + "\r\n" + ex.Message + "\r\n" + ex.StackTrace + "\r\n");
130 //关闭之前accept出来的和客户端进行通信的套接字
131 socketServer.Close();
132 break;
133 }
134 }
135 }
136
137 ///
138 /// 获取当前系统时间的方法
139 /// 当前时间
140 static DateTime GetCurrentTime()
141 {
142 DateTime currentTime = new DateTime();
143 currentTime = DateTime.Now;
144 return currentTime;
145 }
146 }
147 }
Client端代码(Window Form Project):
1 using System;View Code
2 using System.Collections.Generic;
3 using System.ComponentModel;
4 using System.Data;
5 using System.Drawing;
6 using System.Linq;
7 using System.Text;
8 using System.Windows.Forms;
9 using System.Threading;
10 using System.Net.Sockets;
11 using System.Net;
12 using System.Diagnostics;
13
14 namespace SocketClient
15 {
16 public partial class Main : Form
17 {
18 //创建 1个客户端套接字 和1个负责监听服务端请求的线程
19 Thread threadclient = null;
20 Socket socketclient = null;
21
22 public Main()
23 {
24 InitializeComponent();
25
26 StartPosition = FormStartPosition.CenterScreen;
27 //关闭对文本框的非法线程操作检查
28 TextBox.CheckForIllegalCrossThreadCalls = false;
29
30 this.btnSendMessage.Enabled = false;
31 this.btnSendMessage.Visible = false;
32
33 this.txtMessage.Visible = false;
34 }
35
36 private void btnConnection_Click(object sender, EventArgs e)
37 {
38 this.btnConnection.Enabled = false;
39 //定义一个套接字监听
40 socketclient = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
41
42 //获取文本框中的IP地址
43 IPAddress address = IPAddress.Parse("127.0.0.1");
44
45 //将获取的IP地址和端口号绑定在网络节点上
46 IPEndPoint point = new IPEndPoint(address, 8098);
47
48 try
49 {
50 //客户端套接字连接到网络节点上,用的是Connect
51 socketclient.Connect(point);
52 this.btnSendMessage.Enabled = true;
53 this.btnSendMessage.Visible = true;
54 this.txtMessage.Visible = true;
55 }
56 catch (Exception)
57 {
58 Debug.WriteLine("连接失败\r\n");
59
60 this.txtDebugInfo.AppendText("连接失败\r\n");
61 return;
62 }
63
64 threadclient = new Thread(recv);
65 threadclient.IsBackground = true;
66 threadclient.Start();
67 }
68
69 // 接收服务端发来信息的方法
70 void recv()
71 {
72 int x = 0;
73 //持续监听服务端发来的消息
74 while (true)
75 {
76 try
77 {
78 //定义一个1M的内存缓冲区,用于临时性存储接收到的消息
79 byte[] arrRecvmsg = new byte[1024 * 1024];
80
81 //将客户端套接字接收到的数据存入内存缓冲区,并获取长度
82 int length = socketclient.Receive(arrRecvmsg);
83
84 //将套接字获取到的字符数组转换为人可以看懂的字符串
85 string strRevMsg = Encoding.UTF8.GetString(arrRecvmsg, 0, length);
86 if (x == 1)
87 {
88 this.txtDebugInfo.AppendText("服务器:" + GetCurrentTime() + "\r\n" + strRevMsg + "\r\n\n");
89 Debug.WriteLine("服务器:" + GetCurrentTime() + "\r\n" + strRevMsg + "\r\n\n");
90 }
91 else
92 {
93 this.txtDebugInfo.AppendText(strRevMsg + "\r\n\n");
94 Debug.WriteLine(strRevMsg + "\r\n\n");
95 x = 1;
96 }
97 }
98 catch (Exception ex)
99 {
100 Debug.WriteLine("远程服务器已经中断连接" + "\r\n\n");
101 Debug.WriteLine("远程服务器已经中断连接" + "\r\n");
102 break;
103 }
104 }
105 }
106
107 //获取当前系统时间
108 DateTime GetCurrentTime()
109 {
110 DateTime currentTime = new DateTime();
111 currentTime = DateTime.Now;
112 return currentTime;
113 }
114
115 //发送字符信息到服务端的方法
116 void ClientSendMsg(string sendMsg)
117 {
118 //将输入的内容字符串转换为机器可以识别的字节数组
119 byte[] arrClientSendMsg = Encoding.UTF8.GetBytes(sendMsg);
120 //调用客户端套接字发送字节数组
121 socketclient.Send(arrClientSendMsg);
122 //将发送的信息追加到聊天内容文本框中
123 Debug.WriteLine("hello...." + ": " + GetCurrentTime() + "\r\n" + sendMsg + "\r\n\n");
124 this.txtDebugInfo.AppendText("hello...." + ": " + GetCurrentTime() + "\r\n" + sendMsg + "\r\n\n");
125 }
126
127 private void btnSendMessage_Click(object sender, EventArgs e)
128 {
129 //调用ClientSendMsg方法 将文本框中输入的信息发送给服务端
130 ClientSendMsg(this.txtMessage.Text.Trim());
131 this.txtMessage.Clear();
132 }
133 }
134 }
测试结果截图:
server端:
client端:
代码下载地址
链接:http://pan.baidu.com/s/1kVBUOD5 密码:16ib