Netty 源码 NioEventLoop(三)执行流程

时间:2022-09-05 13:42:46

Netty 源码 NioEventLoop(三)执行流程

Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)

上文提到在启动 NioEventLoop 线程时会执行 SingleThreadEventExecutor#doStartThread(),在这个方法中调用 SingleThreadEventExecutor.this.run(),NioEventLoop 重写了 run() 方法。NioEventLoop#run() 代码如下:

@Override
protected void run() {
    for (;;) {
        try {
            // 1. select 策略选择
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                // 1.1 非阻塞的 select 策略。实际上,默认情况下,不会返回 CONTINUE 的策略
                case SelectStrategy.CONTINUE:
                    continue;
                // 1.2 阻塞的 select 策略
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                // 1.3 不需要 select,目前已经有可以执行的任务了
                default:
            }

            // 2. 执行网络 IO 事件和任务调度
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    // 2.1. 处理网络 IO 事件
                    processSelectedKeys();
                } finally {
                    // 2.2. 处理系统 Task 和自定义 Task
                    runAllTasks();
                }
            } else {
                // 根据 ioRatio 计算非 IO 最多执行的时间 
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // 3. 关闭线程
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

NioEventLoop#run() 做了记下事情:

  1. 根据 selectStrategy 执行不同的策略
  2. 执行网络 IO 事件和任务调度
  3. 关闭线程

一、IO 轮询策略

当 taskQueue 中没有任务时,那么 Netty 可以阻塞地等待 IO 就绪事件。而当 taskQueue 中有任务时,我们自然地希望所提交的任务可以尽快地执行 ,因此 Netty 会调用非阻塞的 selectNow() 方法,以保证 taskQueue 中的任务尽快可以执行。

(1) hasTasks

首先,在 run 方法中,第一步是调用 hasTasks() 方法来判断当前任务队列中是否有任务

protected boolean hasTasks() {
    assert inEventLoop();
    return !taskQueue.isEmpty();
}

这个方法很简单,仅仅是检查了一下 taskQueue 是否为空。至于 taskQueue 是什么呢,其实它就是存放一系列的需要由此 EventLoop 所执行的任务列表。关于 taskQueue,我们这里暂时不表,等到后面再来详细分析它。

(2) DefaultSelectStrategy

// NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};

// 非阻塞的 select 策略。实际上,默认情况下,不会返回 CONTINUE 的策略
SelectStrategy.SELECT = -1;
// 阻塞的 select 策略
SelectStrategy.CONTINUE = -2;

// DefaultSelectStrategy 
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

显然当 taskQueue 为空时,执行的是 select(oldWakenUp) 方法。那么 selectNow() 和 select(oldWakenUp) 之间有什么区别呢? 来看一下,selectNow() 的源码如下

(3) selectNow

int selectNow() throws IOException {
    try {
        return selector.selectNow();
    } finally {
        // restore wakeup state if needed
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}

调用 JDK 底层的 selector.selectNow()。selectNow() 方法会检查当前是否有就绪的 IO 事件,如果有,则返回就绪 IO 事件的个数;如果没有,则返回 0。注意,selectNow() 是立即返回的,不会阻塞当前线程。当 selectNow() 调用后,finally 语句块中会检查 wakenUp 变量是否为 true,当为 true 时,调用 selector.wakeup() 唤醒 select() 的阻塞调用。

(4) select(boolean oldWakenUp)

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectedKeys = selector.select(timeoutMillis);
    } catch (CancelledKeyException e) {
    }
}

在这个 select 方法中,调用了 selector.select(timeoutMillis),而这个调用是会阻塞住当前线程的,timeoutMillis是阻塞的超时时间。到来这里,我们可以看到,当 hasTasks() 为真时,调用的的 selectNow() 方法是不会阻塞当前线程的,而当 hasTasks() 为假时,调用的 select(oldWakenUp) 是会阻塞当前线程的。

二、IO 事件的处理

在 NioEventLoop.run() 方法中,第一步是通过 select/selectNow 调用查询当前是否有就绪的 IO 事件,那么当有 IO 事件就绪时,第二步自然就是处理这些 IO 事件啦。首先让我们来看一下 NioEventLoop.run 中循环的剩余部分:

final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
    try {
        // 2.1. 处理网络 IO 事件
        processSelectedKeys();
    } finally {
        // 2.2. 处理系统 Task 和自定义 Task
        runAllTasks();
    }
} else {
    // 根据 ioRatio 计算非 IO 最多执行的时间 
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {
        final long ioTime = System.nanoTime() - ioStartTime;
        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
}

上面列出的代码中,有两个关键的调用:

  • 第一个是 processSelectedKeys():处理准备就绪的 IO 事件;
  • 第二个是 runAllTasks():运行 taskQueue 中的任务。

这里的代码还有一个十分有意思的地方,即 ioRatio。那什么是 ioRatio 呢?它表示的是此线程分配给 IO 操作所占的时间比(即运行 processSelectedKeys 耗时在整个循环中所占用的时间)。例如 ioRatio 默认是 50,则表示 IO 操作和执行 task 的所占用的线程执行时间比是 1 : 1。当知道了 IO 操作耗时和它所占用的时间比,那么执行 task 的时间就可以很方便的计算出来了。

我们这里先分析一下 processSelectedKeys() 方法调用,runAllTasks() 留到下面再分析。processSelectedKeys() 方法的源码如下:

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

由于默认未开启 selectedKeys 优化功能,所以会进入 processSelectedKeysPlain 分支执。下面继续分析 processSelectedKeysPlain 的代码实现。

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    // https://github.com/netty/netty/issues/597
    if (selectedKeys.isEmpty()) {
        return;
    }

    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        final Object a = k.attachment();
        i.remove();

        if (a instanceof AbstractNioChannel) {
            // NioSocketChannel 或 NioServerSocketChannel 进行 IO 读写相关的操作
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            // 用户自行注册的 Task 任务,一般情况下不会执行
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (!i.hasNext()) {
            break;
        }
        // 省略...
    }
}

processSelectedKey 方法源码如下:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // 省略...

    try {
        int readyOps = k.readyOps();
        // 1. OP_CONNECT 读写前要先处理连接,否则可能抛 NotYetConnectedException 异常
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // 2. OP_WRITE
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // 3. OP_READ 或 OP_ACCEPT
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

这个代码是不是很熟悉啊?完全是 Java NIO 的 Selector 的那一套处理流程嘛!processSelectedKey 中处理了三个
事件,分别是:

  • OP_READ 可读事件,即 Channel 中收到了新数据可供上层读取.
  • OP_WRITE 可写事件,即上层可以向 Channel 写入数据.
  • OP_CONNECT 连接建立事件,即 TCP 连接已经建立,Channel 处于 active 状态.

下面我们分别根据这三个事件来看一下 Netty 是怎么处理的吧。

2.1 OP_READ

当就绪的 IO 事件是 OP_READ,代码会调用 unsafe.read() 方法。unsafe 我们已见过多次,NioSocketChannel 的 Unsafe 是在 AbstractNioByteChannel 中实现的,而 NioServerSocketChannel 的 Unsafe 是在 NioMessageUnsafe 中实现。

public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // 1. 分配缓冲区 ByteBuf
            byteBuf = allocHandle.allocate(allocator);
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            // 2. 从 NioSocketChannel 中读取数据
            if (allocHandle.lastBytesRead() <= 0) {
                // nothing was read. release the buffer.
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            // 3. 调用 pipeline.fireChannelRead 发送一个 inbound 事件
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        // See https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

上面 read 方法其实归纳起来,可以认为做了如下工作:

  1. 分配 ByteBuf
  2. 从 SocketChannel 中读取数据
  3. 调用 pipeline.fireChannelRead 发送一个 inbound 事件

2.2 OP_WRITE

OP_WRITE 可写事件代码如下。这里代码比较简单,没有详细分析的必要了。

if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
    ch.unsafe().forceFlush();
}

2.3 OP_CONNECT

最后一个事件是 OP_CONNECT,即 TCP 连接已建立事

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    // 已连接后就需要注销 OP_CONNECT 事件 See https://github.com/netty/netty/issues/924
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
}

OP_CONNECT 事件的处理中,只做了两件事情:

  1. 正如代码中的注释所言, 我们需要将 OP_CONNECT 从就绪事件集中清除, 不然会一直有 OP_CONNECT 事件。

  2. 调用 unsafe.finishConnect() 通知上层连接已建立
    unsafe.finishConnect() 调用最后会调用到 pipeline().fireChannelActive(),产生一个 inbound 事件,通知 pipeline 中的各个 handler TCP 通道已建立(即 ChannelInboundHandler.channelActive 方法会被调用)

到了这里,我们整个 NioEventLoop 的 IO 操作部分已经了解完了,接下来的一节我们要重点分析一下 Netty 的任务
队列机制。

三、任务调度

我们已经提到过,在 Netty 中,一个 NioEventLoop 通常需要肩负起两种任务,第一个是作为 IO 线程,处理 IO 操作;第二个就是作为任务线程,处理 taskQueue 中的任务。这一节的重点就是分析一下 NioEventLoop 的任务队列机制
的。

3.1 普通 Runnable 任务

// SingleThreadEventExecutor
private final Queue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(maxPendingTasks);
protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (!offerTask(task)) {
        reject(task);
    }
}
final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }
    return taskQueue.offer(task);
}

因此实际上,taskQueue 是存放着待执行的任务的队列。

3.2 schedule 任务

除了通过 execute 添加普通的 Runnable 任务外,我们还可以通过调用 eventLoop.scheduleXXX 之类的方法来添加
一个定时任务。schedule 功能的实现是在 SingleThreadEventExecutor 的父类,即 AbstractScheduledEventExecutor 中实现的。

// SingleThreadEventExecutor
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

scheduledTaskQueue 是一个队列(Queue),其中存放的元素是 ScheduledFutureTask。而ScheduledFutureTask 我们很容易猜到,它是对 Schedule 任务的一个抽象。我们来看一下 AbstractScheduledEventExecutor 所实现的 schedule 方法:

<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduledTaskQueue().add(task);
    } else {
        execute(new Runnable() {
            @Override
            public void run() {
                scheduledTaskQueue().add(task);
            }
        });
    }

    return task;
}

3.3 执行调度任务

protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        fetchedAll = fetchFromScheduledTaskQueue();
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    afterRunningAllTasks();
    return ranAtLeastOne;
}

我们前面已经提到过,EventLoop 可以通过调用 EventLoop.execute 来将一个 Runnable 提交到 taskQueue 中,
也可以通过调用 EventLoop.schedule 来提交一个 schedule 任务到 scheduledTaskQueue 中。在此方法的一开
始调用的 fetchFromScheduledTaskQueue() 其实就是将 scheduledTaskQueue 中已经可以执行的(即定时时
间已到的 schedule 任务) 拿出来并添加到 taskQueue 中,作为可执行的 task 等待被调度执行。代码如下:

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    while (scheduledTask != null) {
        if (!taskQueue.offer(scheduledTask)) {
            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        scheduledTask  = pollScheduledTask(nanoTime);
    }
    return true;
}

接下来 runAllTasks() 方法就会不断调用 task = pollTask() 从 taskQueue 中获取一个可执行的 task,然后调用它
的 run() 方法来运行此 task。

注意: 因为 EventLoop 既需要执行 IO 操作,又需要执行 task,因此我们在调用 EventLoop.execute 方法提交
任务时,不要提交耗时任务,更不能提交一些会造成阻塞的任务,不然会导致我们的 IO 线程得不到调度,影响整
个程序的并发量。


每天用心记录一点点。内容也许不重要,但习惯很重要!