一、回传协议接口和TCP方式实现:
1.接口:
1
2
3
4
5
6
7
8
|
import java.nio.channels.SelectionKey;
import java.io.IOException;
public interface EchoProtocol {
void handleAccept(SelectionKey key) throws IOException;
void handleRead(SelectionKey key) throws IOException;
void handleWrite(SelectionKey key) throws IOException;
}
|
2.实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
import java.nio.channels.*;
import java.nio.ByteBuffer;
import java.io.IOException;
public class TCPEchoSelectorProtocol implements EchoProtocol{
private int bufSize; // Size of I/O buffer
public EchoSelectorProtocol( int bufSize) {
this .bufSize = bufSize;
}
public void handleAccept(SelectionKey key) throws IOException {
SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
clntChan.configureBlocking( false ); // Must be nonblocking to register
// Register the selector with new channel for read and attach byte buffer
clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
}
public void handleRead(SelectionKey key) throws IOException {
// Client socket channel has pending data
SocketChannel clntChan = (SocketChannel) key.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
long bytesRead = clntChan.read(buf);
if (bytesRead == - 1 ) { // Did the other end close?
clntChan.close();
} else if (bytesRead > 0 ) {
// Indicate via key that reading/writing are both of interest now.
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
public void handleWrite(SelectionKey key) throws IOException {
/*
* Channel is available for writing, and key is valid (i.e., client channel
* not closed).
*/
// Retrieve data read earlier
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.flip(); // Prepare buffer for writing
SocketChannel clntChan = (SocketChannel) key.channel();
clntChan.write(buf);
if (!buf.hasRemaining()) { // Buffer completely written?
//Nothing left, so no longer interested in writes
key.interestOps(SelectionKey.OP_READ);
}
buf.compact(); // Make room for more data to be read in
}
}
|
二、NIO TCP客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class TCPEchoClientNonblocking {
public static void main(String args[]) throws Exception {
String server = "127.0.0.1" ; // Server name or IP address
// Convert input String to bytes using the default charset
byte [] argument = "0123456789abcdefghijklmnopqrstuvwxyz" .getBytes();
int servPort = 5500 ;
// Create channel and set to nonblocking
SocketChannel clntChan = SocketChannel.open();
clntChan.configureBlocking( false );
// Initiate connection to server and repeatedly poll until complete
if (!clntChan.connect( new InetSocketAddress(server, servPort))) {
while (!clntChan.finishConnect()) {
System.out.print( "." ); // Do something else
}
}
ByteBuffer writeBuf = ByteBuffer.wrap(argument);
ByteBuffer readBuf = ByteBuffer.allocate(argument.length);
int totalBytesRcvd = 0 ; // Total bytes received so far
int bytesRcvd; // Bytes received in last read
while (totalBytesRcvd < argument.length) {
if (writeBuf.hasRemaining()) {
clntChan.write(writeBuf);
}
if ((bytesRcvd = clntChan.read(readBuf)) == - 1 ) {
throw new SocketException( "Connection closed prematurely" );
}
totalBytesRcvd += bytesRcvd;
System.out.print( "." ); // Do something else
}
System.out.println( "Received: " + // convert to String per default charset
new String(readBuf.array(), 0 , totalBytesRcvd).length());
clntChan.close();
}
}
|
三、NIO TCP服务端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Iterator;
public class TCPServerSelector {
private static final int BUFSIZE = 256 ; // Buffer size (bytes)
private static final int TIMEOUT = 3000 ; // Wait timeout (milliseconds)
public static void main(String[] args) throws IOException {
int [] ports = { 5500 };
// Create a selector to multiplex listening sockets and connections
Selector selector = Selector.open();
// Create listening socket channel for each port and register selector
for ( int port : ports) {
ServerSocketChannel listnChannel = ServerSocketChannel.open();
listnChannel.socket().bind( new InetSocketAddress(port));
listnChannel.configureBlocking( false ); // must be nonblocking to register
// Register selector with channel. The returned key is ignored
listnChannel.register(selector, SelectionKey.OP_ACCEPT);
}
// Create a handler that will implement the protocol
TCPProtocol protocol = new TCPEchoSelectorProtocol(BUFSIZE);
while ( true ) { // Run forever, processing available I/O operations
// Wait for some channel to be ready (or timeout)
if (selector.select(TIMEOUT) == 0 ) { // returns # of ready chans
System.out.print( "." );
continue ;
}
// Get iterator on set of keys with I/O to process
Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
while (keyIter.hasNext()) {
SelectionKey key = keyIter.next(); // Key is bit mask
// Server socket channel has pending connection requests?
if (key.isAcceptable()) {
System.out.println( "----accept-----" );
protocol.handleAccept(key);
}
// Client socket channel has pending data?
if (key.isReadable()) {
System.out.println( "----read-----" );
protocol.handleRead(key);
}
// Client socket channel is available for writing and
// key is valid (i.e., channel not closed)?
if (key.isValid() && key.isWritable()) {
System.out.println( "----write-----" );
protocol.handleWrite(key);
}
keyIter.remove(); // remove from set of selected keys
}
}
}
}
|
以上就是本文的全部内容,查看更多Java的语法,也希望大家多多支持服务器之家。