NIO服务器创建的步骤:
步骤一:打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道:
serverSocketChannel = ServerSocketChannel.open()
步骤二:绑定监听端口,设置连接为非阻塞模式:
serverSocketChannel.configureBlocking(false);//设置连接为非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024)
步骤三:创建Reactor线程,创建多路复用器并启动线程:
selector = Selector.open(); //创建多路复用器Selector
new Thread(new ReactorTask()).start();
步骤四:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件:
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
步骤五:多路复用器在线程run方法的无限循环体内轮询准备就绪的Key
步骤六:多路复用器接收到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路:
SocketChannel sc = ssc.accept(); //处理新的请求,完成TCP三次握手,建立物理链路
步骤七:设置客户端链路为非阻塞模式
sc.configureBlocking(false); //设置客户端链路为非阻塞模式
步骤八:将新接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作,用来读取客户端发送的网络消息:
//add the new connection to the selector步骤九:异步读取客户端的请求消息到缓冲区:
sc.register(selector, SelectionKey.OP_READ); //将客户端连接注册到多路复用器上,监听读操作,用来监听客户端发送的网络消息
int readBytes = sc.read(readBuffer);
步骤十:对ByteBuffer消息进行编码,如果有半包消息指针reset,继续读取后续的保温,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排.
步骤十一:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端:
channel.write(writeBuffer);
下面用NIO实现TimeServer:
TimeServer
package com.panther.dong.netty.nio;
/**
* Created by panther on 15-8-26.
*/
public class TimeServer {
public static void main(String[] args) {
int port = 12306;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (Exception e) {
}
}
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
}
}
MultiplexerTimeServer:
package com.panther.dong.netty.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
/**
* Created by panther on 15-8-26.
*/
public class MultiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean stop;
/**
* 初始化多路复用器、绑定监听端口
*
* @param port
*/
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open(); //创建多路复用器Selector
serverSocketChannel = ServerSocketChannel.open(); //打开ServerSocketChannel,用于监听客户端的连接
serverSocketChannel.configureBlocking(false);//设置连接为非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);//绑定监听端口,设置连接为非阻塞模式
serverSocketChannel
.register(selector, SelectionKey.OP_ACCEPT); //将serversocketchannel注册到多路复用器Selector上,监听ACCEPT事件
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
//多路复用器无限轮询准备就绪的key
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
SelectionKey key = null;
while (iterator.hasNext()) {
key = iterator.next();
handleInput(key); //多路复用器处理ACCEPT事件
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
//处理新接入的请求消息
if (key.isAcceptable()) {
//Accept the new Connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept(); //处理新的请求,完成TCP三次握手,建立物理链路
sc.configureBlocking(false); //设置客户端链路为非阻塞模式
//add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ); //将客户端连接注册到多路复用器上,监听读操作,用来监听客户端发送的网络消息
}
if (key.isReadable()) {
//read the data
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("the time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
new Date(System.currentTimeMillis()).toString() :
"BAD ORDER";
doWrite(sc, currentTime);
} else if (readBytes < 0) {
key.cancel();
sc.close();
} else {
; //读到0字节,忽略
}
}
}
}
private void doWrite(SocketChannel channel, String response) throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}