1.Java NIO服务端创建
首先,我们通过一个时序图来看下如何创建一个NIO服务端并启动监听,接收多个客户端的连接,进行消息的异步读写。
示例代码(参考文献【2】):
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set; /**
* User: mihasya
* Date: Jul 25, 2010
* Time: 9:09:03 AM
*/
public class JServer {
public static void main (String[] args) {
ServerSocketChannel sch = null;
Selector sel = null; try {
// setup the socket we're listening for connections on.
InetSocketAddress addr = new InetSocketAddress(8400);
sch = ServerSocketChannel.open();
sch.configureBlocking(false);
sch.socket().bind(addr);
// setup our selector and register the main socket on it
sel = Selector.open();
sch.register(sel, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
System.out.println("Couldn't setup server socket");
System.out.println(e.getMessage());
System.exit(1);
} // fire up the listener thread, pass it our selector
ListenerThread listener = new ListenerThread(sel);
listener.run();
} /*
* the thread is completely unnecessary, it could all just happen
* in main()
*/
class ListenerThread extends Thread {
Selector sel = null;
ListenerThread(Selector sel) {
this.sel = sel;
} public void run() {
while (true) { // our canned response for now
ByteBuffer resp = ByteBuffer.wrap(new String("got it\n").getBytes());
try {
// loop over all the sockets that are ready for some activity
while (this.sel.select() > 0) {
Set keys = this.sel.selectedKeys();
Iterator i = keys.iterator();
while (i.hasNext()) {
SelectionKey key = (SelectionKey)i.next();
if (key.isAcceptable()) {
// this means that a new client has hit the port our main
// socket is listening on, so we need to accept the connection
// and add the new client socket to our select pool for reading
// a command later
System.out.println("Accepting connection!");
// this will be the ServerSocketChannel we initially registered
// with the selector in main()
ServerSocketChannel sch = (ServerSocketChannel)key.channel();
SocketChannel ch = sch.accept();
ch.configureBlocking(false);
ch.register(this.sel, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// one of our client sockets has received a command and
// we're now ready to read it in
System.out.println("Accepting command!");
SocketChannel ch = (SocketChannel)key.channel();
ByteBuffer buf = ByteBuffer.allocate(200);
ch.read(buf);
buf.flip();
Charset charset = Charset.forName("UTF-8");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer cbuf = decoder.decode(buf);
System.out.print(cbuf.toString());
// re-register this socket with the selector, this time
// for writing since we'll want to write something to it
// on the next go-around
ch.register(this.sel, SelectionKey.OP_WRITE);
} else if (key.isWritable()) {
// we are ready to send a response to one of the client sockets
// we had read a command from previously
System.out.println("Sending response!");
SocketChannel ch = (SocketChannel)key.channel();
ch.write(resp);
resp.rewind();
// we may get another command from this guy, so prepare
// to read again. We could also close the channel, but
// that sort of defeats the whole purpose of doing async
ch.register(this.sel, SelectionKey.OP_READ);
}
i.remove();
}
}
} catch (IOException e) {
System.out.println("Error in poll loop");
System.out.println(e.getMessage());
System.exit(1);
}
}
}
}
}
从上面的代码可以看出java nio的通用步骤:
1.打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父通道,绑定监听端口,设置客户端连接方式为非阻塞模式。
2.打开多路复用器并启动服务端监听线程,将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT状态。
3.多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手后,与客户端建立物理链路。
2. Netty服务端创建
2.1 打开ServerSocketChannel
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
创建一个ServerSocketChannel的过程:
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
调用newSocket方法:
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
其中的provider.openServerSocketChannel()就是java nio的实现。设置非阻塞模式包含在父类中:
/**
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}
*/
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
} throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
2.2 打开多路复用器过程
NioEventLoop负责调度和执行Selector轮询操作,选择准备就绪的Channel集合,相关代码如下:
@Override
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp); // 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required). if (wakenUp.get()) {
selector.wakeup();
}
} cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
} if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
2.2.1 绑定处理的key
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
以processSelectedKeysPlain为例:
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
} Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove(); if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
} if (!i.hasNext()) {
break;
} if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
2.3 绑定端口,接收请求:
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(PORT).sync();
调用bind程序
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
} if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
最终的绑定由dobind0来完成
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new OneTimeTask() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
具体实现:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
} try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops); unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
参考文献
【1】http://www.infoq.com/cn/articles/netty-server-create
【2】https://github.com/mihasya/sample-java-nio-server/blob/master/src/JServer.java