Netty系列-6 Netty消息处理流程-2.处理消息

时间:2024-10-02 21:36:56

当通道有消息可读时,NioEventLoop线程从阻塞中唤醒并处理SelectionKey, 流程与NIO相似。详细流程如下所示:
在这里插入图片描述

2.1 可读事件

有可读事件到达时,workerGroup中的某个NioEventLoop将(从select阻塞中)被唤醒,调用processSelectedKeys方法处理就绪的事件。

private void processSelectedKeys() {
	//...
	processSelectedKeysOptimized();
}

private void processSelectedKeysOptimized() {
	for (int i = 0; i < selectedKeys.size; ++i) {
		final SelectionKey k = selectedKeys.keys[i];
		selectedKeys.keys[i] = null;
		final Object a = k.attachment();
		//...
		processSelectedKey(k, (AbstractNioChannel) a);
		//...

	}
}

遍历已就绪的事件,调用processSelectedKey处理。
注意:这里从selectedKey取出的attachment对象是NioSocketChannel通道。
继续跟进processSelectedKey方法:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
	final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

	try {
		int readyOps = k.readyOps();
        //... 省略其他分支:SelectionKey.OP_CONNECT和readyOps & SelectionKey.OP_WRITE

		if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
			unsafe.read();
		}
}

此时,readyOps为SelectionKey.OP_READ,调用unsafe.read()处理消息。

public final void read() {
	final ChannelConfig config = config();
	if (shouldBreakReadReady(config)) {
		clearReadPending();
		return;
	}
	final ChannelPipeline pipeline = pipeline();
	final ByteBufAllocator allocator = config.getAllocator();
	final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
	allocHandle.reset(config);

	ByteBuf byteBuf = null;
	boolean close = false;
	//...
	do {
		byteBuf = allocHandle.allocate(allocator);
		// 读取数据,存入ByteBuf对象
		allocHandle.lastBytesRead(doReadBytes(byteBuf));
		if (allocHandle.lastBytesRead() <= 0) {
			byteBuf.release();
			byteBuf = null;
			close = allocHandle.lastBytesRead() < 0;
			if (close) {
				readPending = false;
			}
			break;
		}

		allocHandle.incMessagesRead(1);
		readPending = false;
		// 读取后,向Pipeline触发ChannelRead事件
		pipeline.fireChannelRead(byteBuf);
		byteBuf = null;
	} while (allocHandle.continueReading());
	//...
	// 消息处理完,向pipeline触发ChannelReadComplete事件
	pipeline.fireChannelReadComplete();
}

逻辑较为清晰:持续调用doReadBytes读取数据至ByteBuf对象中,并将每个读取的ByteBuf以channelRead事件提交到pipeline,全部消息处理完成后,向pipeline提交ChannelReadComplete事件。
这里有个细节需要关注一下,doReadBytes内部使用NIO中的通道将数据流写入到ByteBuf中:

public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    try {
        return in.read(internalNioBuffer(index, length));
    } catch (ClosedChannelException ignored) {
        return -1;
    }
}

2.2 pipeline处理数据

当数据沿着NioSocketChannel通道的Pipeline传输时,从左到右顺序如下:
在这里插入图片描述
Bytebuf类型的消息将沿着解码器Handler->业务Handler->编码器Handler->…->TailContext的顺序处理。
其中HeadContext和TailContext由框架携带,其他Handler由用户根据业务需要开发和引入。
因此,业务Handler中未定义加码器时,第一个处理可读消息的Handler的消息为Bytebuf类型。另外,消息外发时,也需要将业务对象转为Bytebuf类型,才可以正常发出(编码器)。

2.4 扩展

编解码器内容将在Netty系列-7 Netty编解码器中详细介绍。