从netty-example分析Netty组件续

时间:2023-03-09 13:10:44
从netty-example分析Netty组件续

上文我们从netty-example的Discard服务器端示例分析了netty的组件,今天我们从另一个简单的示例Echo客户端分析一下上个示例中没有出现的netty组件。

1. 服务端的连接处理,读写处理

echo客户端代码:

/**
* Sends one message when a connection is open and echoes back any received
* data to the server. Simply put, the echo client initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public final class EchoClient { static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception {
// Configure SSL.git
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
} // Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
}); // Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}

从上面的代码可以看出,discard的服务端代码和echo的客户端代码基本相似,不同的是一个使用ServerBootStrap,另一个使用BootStrap而已。先看一下连接过程

NioEventLoop处理key的过程,

 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
} try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops); unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

2.1 连接流程

调用AbstractNioByteChannel的finishConnect()方法

        @Override
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out. assert eventLoop().inEventLoop(); try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}

触发channelActive操作:

        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
} // trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess(); // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
} // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}

2.2 读操作流程

调用AbstractNioByteChannel的read()方法,

  典型的autoRead流程如下:

  1. 当socket建立连接时,Netty触发一个inbound事件channelActive,然后提交一个read()请求给本身(参考DefaultChannelPipeline.fireChannelActive())

  2. 接收到read()请求后,Netty从socket读取消息。

  3. 当读取到消息时,Netty触发channelRead()。

  4. 当读取不到消息后,Netty触发ChannelReadCompleted().

  5. Netty提交另外一个read()请求来继续从socket中读取消息。

@Override
public final void read() {
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
} final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config); ByteBuf byteBuf = null;
try {
boolean needReadPendingReset = true;
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
break;
} allocHandle.incMessagesRead(1);
if (needReadPendingReset) {
needReadPendingReset = false;
setReadPending(false);
}
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading()); allocHandle.readComplete();
pipeline.fireChannelReadComplete(); if (allocHandle.lastBytesRead() < 0) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
}
}
}
}

触发读操作

    @Override
public ChannelHandlerContext fireChannelRead(Object msg) {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelRead(next, pipeline.touch(msg, next));
return this;
}

读完触发完成事件

    @Override
public ChannelPipeline fireChannelReadComplete() {
head.fireChannelReadComplete();
if (channel.config().isAutoRead()) {
read();
}
return this;
} @Override
public ChannelHandlerContext fireChannelReadComplete() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelReadComplete(next);
return this;
}

2.3 写操作流程

写操作

 @SuppressWarnings("deprecation")
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
} final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
} inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
} try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}

写操作具体实现(以NioSocketChannel为例):

 @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
int size = in.size();
if (size == 0) {
// All written so clear OP_WRITE
clearOpWrite();
break;
}
long writtenBytes = 0;
boolean done = false;
boolean setOpWrite = false; // Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
SocketChannel ch = javaChannel(); // Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
super.doWrite(in);
return;
case 1:
// Only one ByteBuf so use non-gathering write
ByteBuffer nioBuffer = nioBuffers[0];
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final int localWrittenBytes = ch.write(nioBuffer);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
default:
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
} // Release the fully written buffers, and update the indexes of the partially written buffer.
in.removeBytes(writtenBytes); if (!done) {
// Did not write all buffers completely.
incompleteWrite(setOpWrite);
break;
}
}
}

2. ChannelInboundHandler和ChannelInboundHandler

Echo的handler代码如下:

/**
* Handler implementation for the echo client. It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public class EchoClientHandler extends ChannelInboundHandlerAdapter { private final ByteBuf firstMessage; /**
* Creates a client-side handler.
*/
public EchoClientHandler() {
firstMessage = Unpooled.buffer(EchoClient.SIZE);
for (int i = 0; i < firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
}
} @Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}

上面的代码出现了两个重要的netty组件:ChannelInboundHandlerAdapter和ByteBuf。其中ByteBuf在另一篇文章已经讲到。我们这次重点分析一下    ChannelInboundHandlerAdapter及其相关类。

  ChannelInboundHandlerAdapter继承了ChannelInboundHandler,它的作用是将operation转到ChannelPipeline中的下一个ChannelHandler。子类可以重写一个方法的实现来改变。注意:在方法#channelRead(ChannelHandlerContext, Object)自动返回前,message不会释放。若需要一个可以自动释放接收消息的ChannelInboundHandler实现时,请考虑SimpleChannelInboundHandler。

  ChannelOutboundHandlerAdapter继承了ChannelOutboundHandler,它仅通过调用ChannelHandlerContext跳转到每个方法。

  ChannelInboundHandler处理输入的事件,事件由外部事件源产生,例如从一个socket接收到数据。

  ChannelOutboundHandler解析你自己应用提交的操作。

 2.1 ChannelInboundHandler.channelActive() 

从源码角度看一下,Netty触发一个inbound事件channelActive(以LoggingHandler为例):

   @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "ACTIVE"));
}
ctx.fireChannelActive();
}

触发操作如下:

     @Override
public ChannelHandlerContext fireChannelActive() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelActive(next);
return this;
} private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}

invokeChannelActive方法实现:

    @Override
public void invokeChannelActive(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelActiveNow(ctx);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
invokeChannelActiveNow(ctx);
}
});
}
}
public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelActive(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}

2.2 ChannelOutboundHandler.Read()

读的流程:

    @Override
public ChannelHandlerContext read() {
AbstractChannelHandlerContext next = findContextOutbound();
next.invoker().invokeRead(next);
return this;
}

查找outbound的过程:

    private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}

触发读操作:

    @Override
public void invokeRead(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeReadNow(ctx);
} else {
AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
Runnable task = dctx.invokeReadTask;
if (task == null) {
dctx.invokeReadTask = task = new Runnable() {
@Override
public void run() {
invokeReadNow(ctx);
}
};
}
executor.execute(task);
}
}

2.3 ChannelOutboundHandler.write()

以实现类LoggingHandler为例:

    @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "WRITE", msg));
}
ctx.write(msg, promise);
}

具体实现:

    @Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
next.invoker().invokeWrite(next, pipeline.touch(msg, next), promise);
return promise;
}

写操作的触发

    @Override
public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (!validatePromise(ctx, promise, true)) {
// promise cancelled
ReferenceCountUtil.release(msg);
return;
} if (executor.inEventLoop()) {
invokeWriteNow(ctx, msg, promise);
} else {
safeExecuteOutbound(WriteTask.newInstance(ctx, msg, promise), promise, msg);
}
}

立刻触发

    public static void invokeWriteNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).write(ctx, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}

小结:

  Netty中,可以注册多个handler。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行,如下图所示,按照注册的先后顺序对Handler进行排序,request进入Netty后的执行顺序为:

从netty-example分析Netty组件续

参考文献

【1】http://blog.csdn.net/u013252773/article/details/21195593

【2】http://*.com/questions/22354135/in-netty4-why-read-and-write-both-in-outboundhandler