当通道有消息可读时,NioEventLoop线程从阻塞中唤醒并处理SelectionKey, 流程与NIO相似。详细流程如下所示:
2.1 可读事件
有可读事件到达时,workerGroup中的某个NioEventLoop将(从select阻塞中)被唤醒,调用processSelectedKeys方法处理就绪的事件。
private void processSelectedKeys() {
//...
processSelectedKeysOptimized();
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
//...
processSelectedKey(k, (AbstractNioChannel) a);
//...
}
}
遍历已就绪的事件,调用processSelectedKey处理。
注意:这里从selectedKey取出的attachment对象是NioSocketChannel通道。
继续跟进processSelectedKey方法:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
//... 省略其他分支:SelectionKey.OP_CONNECT和readyOps & SelectionKey.OP_WRITE
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
}
此时,readyOps为SelectionKey.OP_READ,调用unsafe.read()
处理消息。
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
//...
do {
byteBuf = allocHandle.allocate(allocator);
// 读取数据,存入ByteBuf对象
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 读取后,向Pipeline触发ChannelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
//...
// 消息处理完,向pipeline触发ChannelReadComplete事件
pipeline.fireChannelReadComplete();
}
逻辑较为清晰:持续调用doReadBytes读取数据至ByteBuf对象中,并将每个读取的ByteBuf以channelRead事件提交到pipeline,全部消息处理完成后,向pipeline提交ChannelReadComplete事件。
这里有个细节需要关注一下,doReadBytes内部使用NIO中的通道将数据流写入到ByteBuf中:
public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
try {
return in.read(internalNioBuffer(index, length));
} catch (ClosedChannelException ignored) {
return -1;
}
}
2.2 pipeline处理数据
当数据沿着NioSocketChannel通道的Pipeline传输时,从左到右顺序如下:
Bytebuf类型的消息将沿着解码器Handler->业务Handler->编码器Handler->…->TailContext的顺序处理。
其中HeadContext和TailContext由框架携带,其他Handler由用户根据业务需要开发和引入。
因此,业务Handler中未定义加码器时,第一个处理可读消息的Handler的消息为Bytebuf类型。另外,消息外发时,也需要将业务对象转为Bytebuf类型,才可以正常发出(编码器)。
2.4 扩展
编解码器内容将在Netty系列-7 Netty编解码器中详细介绍。