【Netty源码】NioEventLoop源码剖析

时间:2022-09-05 13:42:22

NioEventLoopGroup

1.NioEventLoopGroup的类层次图

【Netty源码】NioEventLoop源码剖析

2.NioEventLoopGroup 实例化过程

【Netty源码】NioEventLoop源码剖析

分析:

  • EventLoopGroup(其实是MultithreadEventExecutorGroup) 内部维护一个类型为 EventExecutor children 数组, 其大小是 nThreads, 这样就构成了一个线程池

  • 如果我们在实例化 NioEventLoopGroup 时, 如果指定线程池大小, 则 nThreads 就是指定的值, 反之是处理器核心数 * 2

  • MultithreadEventExecutorGroup 中会调用 newChild 抽象方法来初始化 children 数组

  • newChild方法重载,初始化EventExecutor时,实际执行的是NioEventLoopGroup中的newChild方法,所以,children元素的实际类型为NioEventLoop。

  • NioEventLoop 属性:

    • SelectorProvider provider 属性: NioEventLoopGroup 构造器中通过SelectorProvider.provider() 获取一个 SelectorProvider
    • Selector selector 属性: NioEventLoop 构造器中通过调用通过 selector = provider.openSelector() 获取一个 selector 对象.

NioEventLoop

1.NioEventLoop的类层次图

【Netty源码】NioEventLoop源码剖析

分析:

  • 类的主要继承结构是:
NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor
  • 在 AbstractScheduledEventExecutor 中, Netty 实现了 NioEventLoop 的 schedule 功能, 即我们可以通过调用一个 NioEventLoop 实例的 schedule 方法来运行一些定时任务.

  • 在 SingleThreadEventLoop 中, 又实现了任务队列的功能, 通过它, 我们可以调用一个 NioEventLoop 实例的 execute 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行.

2.两种任务

  • NioEventLoop中维护了一个线程,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:

    • I/O任务:selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。
    • 非IO任务:添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。
  • 两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。

3.NioEventLoop.run 方法源码如下

protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();

processSelectedKeys();

final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}

if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}

分析:首先会调用hasTasks()方法判断当前taskQueue是否有元素。如果taskQueue中有元素,执行 selectNow() 方法,最终执行selector.selectNow(),该方法会立即返回。如果taskQueue没有元素,执行 select(oldWakenUp) 方法

4.selectNow方法源码如下

void selectNow() throws IOException {
try {
selector.selectNow();
} finally {
// restore wakup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}

5.select(oldWakenUp)方法源码如下

private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}

int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}

long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding selector.",
selectCnt);

rebuildSelector();
selector = this.selector;

// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}

currentTimeNanos = time;
}

if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
}
// Harmless exception - log anyway
}
}

分析:

Netty 中的 select ( oldWakenUp) 方法解决了 Nio 中的 bug,即selector的 select 方法导致 cpu100%运行。

我们来看该方法的执行流程:

  • delayNanos(currentTimeNanos):计算延迟任务队列中第一个任务的到期执行时间(即最晚还能延迟多长时间执行),默认返回1s。每个SingleThreadEventExecutor都持有一个延迟执行任务的优先队列PriorityQueue,启动线程时,往队列中加入一个任务。

  • 如果延迟任务队列中第一个任务的最晚还能延迟执行的时间小于500000纳秒,且selectCnt == 0(selectCnt 用来记录selector.select方法的执行次数和标识是否执行过selector.selectNow()),则执行selector.selectNow()方法并立即返回。

  • 否则执行selector.select(timeoutMillis),这个方法已经在深入浅出NIO Socket分析过。

  • 如果已经存在ready的selectionKey,或者selector被唤醒,或者taskQueue不为空,或则scheduledTaskQueue不为空,则退出循环。

  • 如果 selectCnt 没达到阈值SELECTOR_AUTO_REBUILD_THRESHOLD(默认512),则继续进行for循环。其中 currentTimeNanos 在select操作之后会重新赋值当前时间,如果selector.select(timeoutMillis)行为真的阻塞了timeoutMillis,第二次的timeoutMillis肯定等于0,此时selectCnt 为1,所以会直接退出for循环。

  • 如果触发了epool cpu100%的bug,会发生什么?
    selector.select(timeoutMillis)操作会立即返回,不会阻塞timeoutMillis,导致 currentTimeNanos 几乎不变,这种情况下,会反复执行selector.select(timeoutMillis),变量selectCnt 会逐渐变大,当selectCnt 达到阈值,则执行rebuildSelector方法,进行selector重建,解决cpu占用100%的bug。

6.rebuildSelector方法源码如下

public void rebuildSelector() {  
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector();
}
});
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
key.channel().register(newSelector, interestOps, a);
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}

break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}

分析:

  • 该方法的主要流程是,先通过openSelector方法创建一个新的selector。然后将old selector的selectionKey执行cancel。最后将old selector的channel重新注册到新的selector中。

  • 对selector进行rebuild后,需要重新执行方法selectNow,检查是否有已ready的selectionKey。

  • 方法selectNow()或select(oldWakenUp)返回后,执行方法processSelectedKeys和runAllTasks。

  • 如果 ioRatio 不为100时,方法runAllTasks的执行时间只能为ioTime * (100 - ioRatio) / ioRatio,其中ioTime 是方法processSelectedKeys的执行时间。

7.processSelectedKeys 方法源码

private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

当selectedKeys != null时,调用processSelectedKeysOptimized方法,源码如下

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}

selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}

分析:

  • 迭代 selectedKeys 获取就绪的 IO 事件的selectkey存放在数组selectedKeys中, 然后为每个事件都调用 processSelectedKey 来处理它.

  • processSelectedKey 中处理了三个事件, 分别是:OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取.;OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据.;OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态.

8.runAllTasks 方法源码如下

protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
this.lastExecutionTime = lastExecutionTime;
return true;
}

分析:

  • 该方法首先会调用fetchFromScheduledTaskQueue方法把scheduledTaskQueue中已经超过延迟执行时间的任务移到taskQueue中等待被执行

  • 依次从taskQueue任务task执行,每执行64个任务,进行耗时检查,如果已执行时间超过预先设定的执行时间,则停止执行非IO任务,避免非IO任务太多,影响IO任务的执行。

完整的方法调用流程图

【Netty源码】NioEventLoop源码剖析
画图不易啊,如需使用请注明出处,谢谢!

注:蓝色方框表示类,绿色方框表示方法。

分析:NioEventLoop的启动线程通过SingleThreadEventExecutor的构造方法,初始化一个线程,并在线程内部执行NioEventLoop类的run方法,当然这个线程不会立刻执行。然后使用LinkedBlockingQueue类初始化taskQueue。



本人才疏学浅,若有错,请指出,谢谢!
如果你有更好的建议,可以留言我们一起讨论,共同进步!
衷心的感谢您能耐心的读完本篇博文!

参考链接: 我就是大名鼎鼎的 EventLoop