一、摘要
本篇阐述基于tcp通信协议的异步实现。
二、实验平台
visual studio 2010
三、异步通信实现原理及常用方法
3.1 建立连接
在同步模式中,在服务器上使用accept方法接入连接请求,而在客户端则使用connect方法来连接服务器。相对地,在异步模式下,服务器可以使用beginaccept方法和endaccept方法来完成连接到客户端的任务,在客户端则通过beginconnect方法和endconnect方法来实现与服务器的连接。
beginaccept在异步方式下传入的连接尝试,它允许其他动作而不必等待连接建立才继续执行后面程序。在调用beginaccept之前,必须使用listen方法来侦听是否有连接请求,beginaccept的函数原型为:
1
|
beginaccept(asynccallback asynccallback, ojbect state)
|
参数:
asynccallback:代表回调函数
state:表示状态信息,必须保证state中包含socket的句柄
使用beginaccept的基本流程是:
(1)创建本地终节点,并新建套接字与本地终节点进行绑定;
(2)在端口上侦听是否有新的连接请求;
(3)请求开始接入新的连接,传入socket的实例或者stateojbect的实例。
参考代码:
1
2
3
4
5
6
7
8
|
//定义ip地址
ipaddress local = ipaddress.parse( "127.0,0,1" );
ipendpoint iep = new ipendpoint(local,13000);
//创建服务器的socket对象
socket server = new socket(addressfamily.internetwork,sockettype.stream,protocoltype.tcp);
server.bind(iep);
server.listen(20);
server.beginaccecpt( new asynccallback(accept),server);
|
当beginaccept()方法调用结束后,一旦新的连接发生,将调用回调函数,而该回调函数必须包括用来结束接入连接操作的endaccept()方法。
该方法参数列表为 socket endaccept(iasyncresult iar)
下面为回调函数的实例:
1
2
3
4
5
6
7
|
void accept(iasyncresult iar)
{
//还原传入的原始套接字
socket myserver = (socket)iar.asyncstate;
//在原始套接字上调用endaccept方法,返回新的套接字
socket service = myserver.endaccept(iar);
}
|
至此,服务器端已经准备好了。客户端应通过beginconnect方法和endconnect来远程连接主机。在调用beginconnect方法时必须注册相应的回调函数并且至少传递一个socket的实例给state参数,以保证endconnect方法中能使用原始的套接字。下面是一段是beginconnect的调用:
1
2
3
4
|
socket socket= new socket(addressfamily.internetwork,sockettype.stream,protocoltype.tcp)
ipaddress ip=ipaddress.parse( "127.0.0.1" );
ipendpoint iep= new ipendpoint(ip,13000);
socket.beginconnect(iep, new asynccallback(connect),socket);
|
endconnect是一种阻塞方法,用于完成beginconnect方法的异步连接诶远程主机的请求。在注册了回调函数后必须接收beginconnect方法返回的iasynccreuslt作为参数。下面为代码演示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
void connect(iasyncresult iar)
{
socket client=(socket)iar.asyncstate;
try
{
client.endconnect(iar);
}
catch (exception e)
{
console.writeline(e.tostring());
}
finally
{
}
}
|
除了采用上述方法建立连接之后,也可以采用tcplistener类里面的方法进行连接建立。下面是服务器端对关于tcplistener类使用beginaccetptcpclient方法处理一个传入的连接尝试。以下是使用beginaccetptcpclient方法和endaccetptcpclient方法的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public static void dobeginaccept(tcplistener listner)
{
//开始从客户端监听连接
console.writeline( "waitting for a connection" );
//接收连接
//开始准备接入新的连接,一旦有新连接尝试则调用回调函数doaccepttcpcliet
listner.beginaccepttcpclient( new asynccallback(doaccepttcpcliet), listner);
}
//处理客户端的连接
public static void doaccepttcpcliet(iasyncresult iar)
{
//还原原始的tcplistner对象
tcplistener listener = (tcplistener)iar.asyncstate;
//完成连接的动作,并返回新的tcpclient
tcpclient client = listener.endaccepttcpclient(iar);
console.writeline( "连接成功" );
}
|
代码的处理逻辑为:
(1)调用beginaccetptcpclient方法开开始连接新的连接,当连接视图发生时,回调函数被调用以完成连接操作;
(2)上面doaccepttcpcliet方法通过asyncstate属性获得由beginaccepttcpclient传入的listner实例;
(3)在得到listener对象后,用它调用endaccepttcpclient方法,该方法返回新的包含客户端信息的tcpclient。
beginconnect方法和endconnect方法可用于客户端尝试建立与服务端的连接,这里和第一种方法并无区别。下面看实例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public void dobeginconnect(iasyncresult iar)
{
socket client=(socket)iar.asyncstate;
//开始与远程主机进行连接
client.beginconnect(serverip[0],13000,requestcallback,client);
console.writeline( "开始与服务器进行连接" );
}
private void requestcallback(iasyncresult iar)
{
try
{
//还原原始的tcpclient对象
tcpclient client=(tcpclient)iar.asyncstate;
//
client.endconnect(iar);
console.writeline( "与服务器{0}连接成功" ,client.client.remoteendpoint);
}
catch (exception e)
{
console.writeline(e.tostring());
}
finally
{
}
}
|
以上是建立连接的两种方法。可根据需要选择使用。
3.2 发送与接受数据
在建立了套接字的连接后,就可以服务器端和客户端之间进行数据通信了。异步套接字用beginsend和endsend方法来负责数据的发送。注意在调用beginsend方法前要确保双方都已经建立连接,否则会出异常。下面演示代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
private static void send(socket handler, string data)
{
// convert the string data to byte data using ascii encoding.
byte [] bytedata = encoding.ascii.getbytes(data);
// begin sending the data to the remote device.
handler.beginsend(bytedata, 0, bytedata.length, 0, new asynccallback(sendcallback), handler);
}
private static void sendcallback(iasyncresult ar)
{
try
{
// retrieve the socket from the state object.
socket handler = (socket)ar.asyncstate;
// complete sending the data to the remote device.
int bytessent = handler.endsend(ar);
console.writeline( "sent {0} bytes to client." , bytessent);
handler.shutdown(socketshutdown.both);
handler.close();
}
catch (exception e)
{
console.writeline(e.tostring());
}
}
|
接收数据是通过beginreceive和endreceive方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
private static void receive(socket client)
{
try
{
// create the state object.
stateobject state = new stateobject();
state.worksocket = client;
// begin receiving the data from the remote device.
client.beginreceive(state.buffer, 0, stateobject.buffersize, 0, new asynccallback(receivecallback), state);
}
catch (exception e)
{
console.writeline(e.tostring());
}
}
private static void receivecallback(iasyncresult ar)
{
try
{
// retrieve the state object and the client socket
// from the asynchronous state object.
stateobject state = (stateobject)ar.asyncstate;
socket client = state.worksocket;
// read data from the remote device.
int bytesread = client.endreceive(ar);
if (bytesread > 0)
{
// there might be more data, so store the data received so far.
state.sb.append(encoding.ascii.getstring(state.buffer, 0, bytesread));
// get the rest of the data.
client.beginreceive(state.buffer, 0, stateobject.buffersize, 0, new asynccallback(receivecallback), state);
}
else
{
// all the data has arrived; put it in response.
if (state.sb.length > 1)
{
response = state.sb.tostring();
}
// signal that all bytes have been received.
receivedone. set ();
}
}
catch (exception e)
{
console.writeline(e.tostring());
}
}
|
上述代码的处理逻辑为:
(1)首先处理连接的回调函数里得到的通讯套接字client,接着开始接收数据;
(2)当数据发送到缓冲区中,beginreceive方法试图从buffer数组中读取长度为buffer.length的数据块,并返回接收到的数据量bytesread。最后接收并打印数据。
除了上述方法外,还可以使用基于networkstream相关的异步发送和接收方法,下面是基于networkstream相关的异步发送和接收方法的使用介绍。
networkstream使用beginread和endread方法进行读操作,使用beginwreite和endwrete方法进行写操作,下面看实例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
static void datahandle(tcpclient client)
{
tcpclient tcpclient = client;
//使用tcpclient的getstream方法获取网络流
networkstream ns = tcpclient.getstream();
//检查网络流是否可读
if (ns.canread)
{
//定义缓冲区
byte [] read = new byte [1024];
ns.beginread(read,0,read.length, new asynccallback(myreadcallback),ns);
}
else
{
console.writeline( "无法从网络中读取流数据" );
}
}
public static void myreadcallback(iasyncresult iar)
{
networkstream ns = (networkstream)iar.asyncstate;
byte [] read = new byte [1024];
string data = "" ;
int recv;
recv = ns.endread(iar);
data = string .concat(data, encoding.ascii.getstring(read, 0, recv));
//接收到的消息长度可能大于缓冲区总大小,反复循环直到读完为止
while (ns.dataavailable)
{
ns.beginread(read, 0, read.length, new asynccallback(myreadcallback), ns);
}
//打印
console.writeline( "您收到的信息是" + data);
}
|
3.3 程序阻塞与异步中的同步问题
.net里提供了eventwaithandle类来表示一个线程的同步事件。eventwaithandle即事件等待句柄,他允许线程通过操作系统互发信号和等待彼此的信号来达到线程同步的目的。这个类有2个子类,分别为autoresteevnt(自动重置)和manualrestevent(手动重置)。下面是线程同步的几个方法:
(1)rset方法:将事件状态设为非终止状态,导致线程阻塞。这里的线程阻塞是指允许其他需要等待的线程进行阻塞即让含waitone()方法的线程阻塞;
(2)set方法:将事件状态设为终止状态,允许一个或多个等待线程继续。该方法发送一个信号给操作系统,让处于等待的某个线程从阻塞状态转换为继续运行,即waitone方法的线程不在阻塞;
(3)waitone方法:阻塞当前线程,直到当前的等待句柄收到信号。此方法将一直使本线程处于阻塞状态直到收到信号为止,即当其他非阻塞进程调用set方法时可以继续执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
public static void startlistening()
{
// data buffer for incoming data.
byte [] bytes = new byte [1024];
// establish the local endpoint for the socket.
// the dns name of the computer
// running the listener is "host.contoso.com".
//iphostentry iphostinfo = dns.resolve(dns.gethostname());
//ipaddress ipaddress = iphostinfo.addresslist[0];
ipaddress ipaddress = ipaddress.parse( "127.0.0.1" );
ipendpoint localendpoint = new ipendpoint(ipaddress, 11000);
// create a tcp/ip socket.
socket listener = new socket(addressfamily.internetwork,sockettype.stream, protocoltype.tcp);
// bind the socket to the local
//endpoint and listen for incoming connections.
try
{
listener.bind(localendpoint);
listener.listen(100);
while ( true )
{
// set the event to nonsignaled state.
alldone.reset();
// start an asynchronous socket to listen for connections.
console.writeline( "waiting for a connection..." );
listener.beginaccept( new asynccallback(acceptcallback),listener);
// wait until a connection is made before continuing.
alldone.waitone();
}
}
catch (exception e)
{
console.writeline(e.tostring());
}
console.writeline( "\npress enter to continue..." );
console.read();
}
|
上述代码的逻辑为:
(1)试用了manualrestevent对象创建一个等待句柄,在调用beginaccept方法前使用rest方法允许其他线程阻塞;
(2)为了防止在连接完成之前对套接字进行读写操作,务必要在beginaccept方法后调用waitone来让线程进入阻塞状态。
当有连接接入后系统会自动调用会调用回调函数,所以当代码执行到回调函数时说明连接已经成功,并在函数的第一句就调用set方法让处于等待的线程可以继续执行。
四、实例
下面是一个实例,客户端请求连接,服务器端侦听端口,当连接建立之后,服务器发送字符串给客户端,客户端收到后并回发给服务器端。
服务器端代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
using system;
using system.net;
using system.net.sockets;
using system.text;
using system.threading;
// state object for reading client data asynchronously
public class stateobject
{
// client socket.
public socket worksocket = null ;
// size of receive buffer.
public const int buffersize = 1024;
// receive buffer.
public byte [] buffer = new byte [buffersize];
// received data string.
public stringbuilder sb = new stringbuilder();
}
public class asynchronoussocketlistener
{
// thread signal.
public static manualresetevent alldone = new manualresetevent( false );
public asynchronoussocketlistener()
{
}
public static void startlistening()
{
// data buffer for incoming data.
byte [] bytes = new byte [1024];
// establish the local endpoint for the socket.
// the dns name of the computer
// running the listener is "host.contoso.com".
//iphostentry iphostinfo = dns.resolve(dns.gethostname());
//ipaddress ipaddress = iphostinfo.addresslist[0];
ipaddress ipaddress = ipaddress.parse( "127.0.0.1" );
ipendpoint localendpoint = new ipendpoint(ipaddress, 11000);
// create a tcp/ip socket.
socket listener = new socket(addressfamily.internetwork,sockettype.stream, protocoltype.tcp);
// bind the socket to the local
//endpoint and listen for incoming connections.
try
{
listener.bind(localendpoint);
listener.listen(100);
while ( true )
{
// set the event to nonsignaled state.
alldone.reset();
// start an asynchronous socket to listen for connections.
console.writeline( "waiting for a connection..." );
listener.beginaccept( new asynccallback(acceptcallback),listener);
// wait until a connection is made before continuing.
alldone.waitone();
}
}
catch (exception e)
{
console.writeline(e.tostring());
}
console.writeline( "\npress enter to continue..." );
console.read();
}
public static void acceptcallback(iasyncresult ar)
{
// signal the main thread to continue.
alldone. set ();
// get the socket that handles the client request.
socket listener = (socket)ar.asyncstate;
socket handler = listener.endaccept(ar);
// create the state object.
stateobject state = new stateobject();
state.worksocket = handler;
handler.beginreceive(state.buffer, 0, stateobject.buffersize, 0, new asynccallback(readcallback), state);
}
public static void readcallback(iasyncresult ar)
{
string content = string .empty;
// retrieve the state object and the handler socket
// from the asynchronous state object.
stateobject state = (stateobject)ar.asyncstate;
socket handler = state.worksocket;
// read data from the client socket.
int bytesread = handler.endreceive(ar);
if (bytesread > 0)
{
// there might be more data, so store the data received so far.
state.sb.append(encoding.ascii.getstring(state.buffer, 0, bytesread));
// check for end-of-file tag. if it is not there, read
// more data.
content = state.sb.tostring();
if (content.indexof( "<eof>" ) > -1)
{
// all the data has been read from the
// client. display it on the console.
console.writeline( "read {0} bytes from socket. \n data : {1}" , content.length, content);
// echo the data back to the client.
send(handler, content);
}
else
{
// not all data received. get more.
handler.beginreceive(state.buffer, 0, stateobject.buffersize, 0, new asynccallback(readcallback), state);
}
}
}
private static void send(socket handler, string data)
{
// convert the string data to byte data using ascii encoding.
byte [] bytedata = encoding.ascii.getbytes(data);
// begin sending the data to the remote device.
handler.beginsend(bytedata, 0, bytedata.length, 0, new asynccallback(sendcallback), handler);
}
private static void sendcallback(iasyncresult ar)
{
try
{
// retrieve the socket from the state object.
socket handler = (socket)ar.asyncstate;
// complete sending the data to the remote device.
int bytessent = handler.endsend(ar);
console.writeline( "sent {0} bytes to client." , bytessent);
handler.shutdown(socketshutdown.both);
handler.close();
}
catch (exception e)
{
console.writeline(e.tostring());
}
}
public static int main( string [] args)
{
startlistening();
return 0;
}
}
|
客户端代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
using system;
using system.net;
using system.net.sockets;
using system.threading;
using system.text;
// state object for receiving data from remote device.
public class stateobject
{
// client socket.
public socket worksocket = null ;
// size of receive buffer.
public const int buffersize = 256;
// receive buffer.
public byte [] buffer = new byte [buffersize];
// received data string.
public stringbuilder sb = new stringbuilder();
}
public class asynchronousclient
{
// the port number for the remote device.
private const int port = 11000;
// manualresetevent instances signal completion.
private static manualresetevent connectdone = new manualresetevent( false );
private static manualresetevent senddone = new manualresetevent( false );
private static manualresetevent receivedone = new manualresetevent( false );
// the response from the remote device.
private static string response = string .empty;
private static void startclient()
{
// connect to a remote device.
try
{
// establish the remote endpoint for the socket.
// the name of the
// remote device is "host.contoso.com".
//iphostentry iphostinfo = dns.resolve("user");
//ipaddress ipaddress = iphostinfo.addresslist[0];
ipaddress ipaddress = ipaddress.parse( "127.0.0.1" );
ipendpoint remoteep = new ipendpoint(ipaddress, port);
// create a tcp/ip socket.
socket client = new socket(addressfamily.internetwork, sockettype.stream, protocoltype.tcp);
// connect to the remote endpoint.
client.beginconnect(remoteep, new asynccallback(connectcallback), client);
connectdone.waitone();
// send test data to the remote device.
send(client, "this is a test<eof>" );
senddone.waitone();
// receive the response from the remote device.
receive(client);
receivedone.waitone();
// write the response to the console.
console.writeline( "response received : {0}" , response);
// release the socket.
client.shutdown(socketshutdown.both);
client.close();
console.readline();
}
catch (exception e)
{
console.writeline(e.tostring());
}
}
private static void connectcallback(iasyncresult ar)
{
try
{
// retrieve the socket from the state object.
socket client = (socket)ar.asyncstate;
// complete the connection.
client.endconnect(ar);
console.writeline( "socket connected to {0}" , client.remoteendpoint.tostring());
// signal that the connection has been made.
connectdone. set ();
}
catch (exception e)
{
console.writeline(e.tostring());
}
}
private static void receive(socket client)
{
try
{
// create the state object.
stateobject state = new stateobject();
state.worksocket = client;
// begin receiving the data from the remote device.
client.beginreceive(state.buffer, 0, stateobject.buffersize, 0, new asynccallback(receivecallback), state);
}
catch (exception e)
{
console.writeline(e.tostring());
}
}
private static void receivecallback(iasyncresult ar)
{
try
{
// retrieve the state object and the client socket
// from the asynchronous state object.
stateobject state = (stateobject)ar.asyncstate;
socket client = state.worksocket;
// read data from the remote device.
int bytesread = client.endreceive(ar);
if (bytesread > 0)
{
// there might be more data, so store the data received so far.
state.sb.append(encoding.ascii.getstring(state.buffer, 0, bytesread));
// get the rest of the data.
client.beginreceive(state.buffer, 0, stateobject.buffersize, 0, new asynccallback(receivecallback), state);
}
else
{
// all the data has arrived; put it in response.
if (state.sb.length > 1)
{
response = state.sb.tostring();
}
// signal that all bytes have been received.
receivedone. set ();
}
}
catch (exception e)
{
console.writeline(e.tostring());
}
}
private static void send(socket client, string data)
{
// convert the string data to byte data using ascii encoding.
byte [] bytedata = encoding.ascii.getbytes(data);
// begin sending the data to the remote device.
client.beginsend(bytedata, 0, bytedata.length, 0, new asynccallback(sendcallback), client);
}
private static void sendcallback(iasyncresult ar)
{
try
{
// retrieve the socket from the state object.
socket client = (socket)ar.asyncstate;
// complete sending the data to the remote device.
int bytessent = client.endsend(ar);
console.writeline( "sent {0} bytes to server." , bytessent);
// signal that all bytes have been sent.
senddone. set ();
}
catch (exception e)
{
console.writeline(e.tostring());
}
}
public static int main( string [] args)
{
startclient();
return 0;
}
}
|
五、实验结果
图1 服务器端界面
图2 客户端界面