Netty5.0的NioEventLoop源码详细分析

时间:2022-09-05 13:42:16

了解Netty线程模型的小伙伴应该都知道,Netty的线程有两个NioEventLoopGroup线程池,一个是boss线程池,一个是worker线程池,其中worker线程池的任务如下:

a.异步读取通讯对端的消息,向ChannelPipeline发出读事件

b.异步向通讯对端发送消息,调用ChannelPipeline发送消息接口

c.执行系统Task任务

d.执行定时任务

 系统Task

通过调用NioEventLoop的execute(Runnable task)方法实现,创建它们的原因是当IO线程和用户线程都在操作同一个资源时,

会发生锁竞争的问题,所以将用户线程封装为一个Task,交给IO线程串行处理,实现局部无锁化

 定时Task

通过调用NioEventLoop的schedule(Runnable command,long delay,TimeUnit unit)实现,主要用于监控和检查等定时动作

所以Netty的NioEventLoop并不是一个纯粹的I/O线程,它还负责调度执行Task任务

 下面看看NioEventLoop的类图

Netty5.0的NioEventLoop源码详细分析

作为NIO框架的Reactor线程,NioEventLoop需要处理网络I/O读写事件,因此它必须聚合一个多路复用器对象--Selector

Netty5.0的NioEventLoop源码详细分析

selector的初始化方法就是直接调用openSelector()方法

Netty5.0的NioEventLoop源码详细分析

 从上图中可以看到,Netty对Selector的selectedKeys进行了优化,用户可以通过io.netty.noKeySetOptimization开关决定

是否启用该优化项,默认不打开优化。如果没有开启该优化,则由provider.openSelector()创建并打开selector之后就直接返回,

如果设置了开启优化,则通过反射机制获取到selectedKeys和publicSelectedKeys,并将这两个属性设为可写,

然后在使用它们将新创建的selectedKeySet与selector绑定,并将新的selectedKeySet将原JDK中的selectedKeys替换。

 上面就是多路复用器Selector的初始化过程,下面研究关键的run()方法。

    protected void run() {
boolean oldWakenUp = this.wakenUp.getAndSet(false);

try {
if (this.hasTasks()) {
this.selectNow();
} else {
this.select(oldWakenUp);
if (this.wakenUp.get()) {
this.selector.wakeup();
}
}

this.cancelledKeys = 0;
this.needsToSelectAgain = false;
int ioRatio = this.ioRatio;
if (ioRatio == 100) {
this.processSelectedKeys();
this.runAllTasks();
} else {
long ioStartTime = System.nanoTime();
this.processSelectedKeys();
long ioTime = System.nanoTime() - ioStartTime;
this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
}

if (this.isShuttingDown()) {
this.closeAll();
if (this.confirmShutdown()) {
this.cleanupAndTerminate(true);
return;
}
}
} catch (Throwable var8) {
logger.warn("Unexpected exception in the selector loop.", var8);

try {
Thread.sleep(1000L);
} catch (InterruptedException var7) {
;
}
}

this.scheduleExecution();
}
 每次执行先将wakenUp还原为false,并将之前的wakeUp状态保存到oldWakenUp变量中,这样即使进入到后面的select(oldWakenUp)分支,如果有新任务到来,也能及时处理。

boolean oldWakenUp = this.wakenUp.getAndSet(false);

 通过hasTasks()方法判断消息队列当中是否有未处理的任务,如果有则调用selectNow()方法立即进行一次select操作,

看是否有准备就绪的Channel需要处理。

if (this.hasTasks()) {
this.selectNow();
}
 Selector的selectNow()方法会立即触发Selector的选择操作,如果有准备就绪的Channel,则返回就绪Channel的集合,否则返回0。最后再判断用户是否调用了Selector的wakenUp(),如果有,则执行selector.wakeup()

    void selectNow() throws IOException {
try {
this.selector.selectNow();
} finally {
if (this.wakenUp.get()) {
this.selector.wakeup();
}

}

}
回到run()方法继续分析,如果消息队列中没有待处理的消息,则执行select(oldWakenUp)方法

    private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;

try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + this.delayNanos(currentTimeNanos);

while(true) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0L) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}

int selectedKeys = selector.select(timeoutMillis);
++selectCnt;
if (selectedKeys != 0 || oldWakenUp || this.wakenUp.get() || this.hasTasks() || this.hasScheduledTasks()) {
break;
}

if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}

selectCnt = 1;
break;
}

long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding selector.", selectCnt);
this.rebuildSelector();
selector = this.selector;
selector.selectNow();
selectCnt = 1;
break;
}

currentTimeNanos = time;
}

if (selectCnt > 3 && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
}
} catch (CancelledKeyException var13) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", var13);
}
}

}
 先取系统的纳秒时间,调用delayNanos()方法计算获得NioEventLoop中定时任务的触发时间,计算下一个将要触发的定时任务的剩余超时时间,将它转换成毫秒,为超时时间增加0.5毫秒的调整值。对剩余的超时时间进行判断,如果需要立即执行或者已经超时,则调用selector.selectNow()进行轮询操作,将selectCnt设置为1,并退出当前循环。

            int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + this.delayNanos(currentTimeNanos);

while(true) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0L) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
 然后将定时操作剩余的超时时间作为参数进行select,每进行一次,就将计数器selectCnt加1,这个是为了下文解决JDK select的bug用的。

int selectedKeys = selector.select(timeoutMillis);
++selectCnt;

Select操作结束之后,需要对结果进行判断,如果存在下列任意一种情况,则break操作

1.有Channel处于就绪状态,即selectedKeys != 0 证明有读写操作需要jinxing

2.oldWakenUp为true

3.系统或者用户调用了wakeup操作,唤醒当前的多路复用器

4.消息队列当中有任务需要执行

if (selectedKeys != 0 || oldWakenUp || this.wakenUp.get() || this.hasTasks() || this.hasScheduledTasks()) {
break;
}
 如果本次Selector的轮询结果为空,也没有wakeup操作或是新的消息需要处理,则说明是个空轮询,在JDK原生的NIO中,这可能触发epoll的bug,它会导致Selector的空轮询,使I/O线程一直处于100%状态。这个问题在Netty中得到了修复,策略如下:

1.对Selector的select操作周期进行统计

2.每完成一次空的select操作进行一个计数

3.在某个周期内如果连续发生了N次(默认为512次)空轮询,说明触发了JDK NIO的epoll()死循环的bug.

                long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding selector.", selectCnt);
this.rebuildSelector();
selector = this.selector;
selector.selectNow();
selectCnt = 1;
break;
}

监测到Selector处于死循环的状态下,会通过重建Selector来解决这个问题

    public void rebuildSelector() {
if (!this.inEventLoop()) {
this.execute(new Runnable() {
public void run() {
NioEventLoop.this.rebuildSelector();
}
});
} else {
Selector oldSelector = this.selector;
if (oldSelector != null) {
Selector newSelector;
try {
newSelector = this.openSelector();
} catch (Exception var9) {
logger.warn("Failed to create a new Selector.", var9);
return;
}

int nChannels = 0;

label69:
while(true) {
try {
Iterator i$ = oldSelector.keys().iterator();

while(true) {
if (!i$.hasNext()) {
break label69;
}

SelectionKey key = (SelectionKey)i$.next();
Object a = key.attachment();

try {
if (key.isValid() && key.channel().keyFor(newSelector) == null) {
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
((AbstractNioChannel)a).selectionKey = newKey;
}

++nChannels;
}
} catch (Exception var11) {
logger.warn("Failed to re-register a Channel to the new Selector.", var11);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel)a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
NioTask<SelectableChannel> task = (NioTask)a;
invokeChannelUnregistered(task, key, var11);
}
}
}
} catch (ConcurrentModificationException var12) {
;
}
}

this.selector = newSelector;

try {
oldSelector.close();
} catch (Throwable var10) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", var10);
}
}

logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}
}
 首先通过inEventLoop方法判断是否是其它线程发起的rebuildSelector,如果是其它线程发起的,为了避免多个线程并发操作Selector和其它资源,则需要将rebuildSelector封装成Task,放到NioEventLoop的消息队列中,由NioEventLoop线程负责,这样避免了线程安全问题。

if (!this.inEventLoop()) {
this.execute(new Runnable() {
public void run() {
NioEventLoop.this.rebuildSelector();
}
});
}
 接着通过openSelector新建并打开一个newSelector,通过循环,将原Selector上注册时SocketChannel从旧的Selector上去除注册,并重新注册到新的Selector上,将newSelector赋个NioEventLoop,然后将老的Selector关闭。

通过销毁旧的、有问题的多路复用器,使用新建的Selector,就可以解决空轮询Selector导致的bug。

如果轮询到了处于就绪状态的SocketChannel,则需要处理网络I/O事件

this.cancelledKeys = 0;
this.needsToSelectAgain = false;
int ioRatio = this.ioRatio;
if (ioRatio == 100) {
this.processSelectedKeys();
this.runAllTasks();
} else {
long ioStartTime = System.nanoTime();
this.processSelectedKeys();
long ioTime = System.nanoTime() - ioStartTime;
this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
}
其中processSelectedKeys()代码如下

    private void processSelectedKeys() {
if (this.selectedKeys != null) {
this.processSelectedKeysOptimized(this.selectedKeys.flip());
} else {
this.processSelectedKeysPlain(this.selector.selectedKeys());
}

}
由于默认没有开启selectedKeys优化,所以会调用processSelectedKeysPlain方法

    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (!selectedKeys.isEmpty()) {
Iterator i = selectedKeys.iterator();

while(true) {
SelectionKey k = (SelectionKey)i.next();
Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel)a);
} else {
NioTask<SelectableChannel> task = (NioTask)a;
processSelectedKey(k, task);
}

if (!i.hasNext()) {
break;
}

if (this.needsToSelectAgain) {
this.selectAgain();
selectedKeys = this.selector.selectedKeys();
if (selectedKeys.isEmpty()) {
break;
}

i = selectedKeys.iterator();
}
}

}
}
 先对SelectedKeys进行保护性判断,如果为空则返回。否则获取SelectedKeys迭代器进行循环遍历,获取selectionKey和SocketChannel的附件对象,将已经选择的选择键从迭代器中删除,防止下次被重复选择和处理。

if (!selectedKeys.isEmpty()) {
Iterator i = selectedKeys.iterator();

while(true) {
SelectionKey k = (SelectionKey)i.next();
Object a = k.attachment();
i.remove();
 然后将SocketChannel附件对象进行判断,如果包含AbstractNioChannel,则证明是SocketChannel或者是ServerSocketChannel,需要进行I/O读写相关的操作,否则就是NioTask,需要类型转换为NioTask(由于Netty自身没有实现NioTask)接口,所以通常系统不会执行该分支,除非用户自行注册该Task到多路复用器。

                if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel)a);
} else {
NioTask<SelectableChannel> task = (NioTask)a;
processSelectedKey(k, task);
}
 从代码中可以看到,接下来需要执行processSelectedKey。在该方法中进行I/O操作,首先从NioServerSocketChannel或者NioSocketChannel中获取其内部类Unsafe,判断选择键是否可用,不可用则关闭unsafe,释放连接资源

NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
unsafe.close(unsafe.voidPromise());
}
如果选择键可用,就获取其值跟网络操作位进行与运算。

 else {
try {
int readyOps = k.readyOps();
if ((readyOps & 17) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}

if ((readyOps & 4) != 0) {
ch.unsafe().forceFlush();
}

if ((readyOps & 8) != 0) {
int ops = k.interestOps();
ops &= -9;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException var5) {
unsafe.close(unsafe.voidPromise());
}

}
 如果是读或者连接操作,则调用Unsafe的read方法。此处Unsafe的实现是个多态,对于NioServerSocketChannel,它的读操作就是接受客户端的TCP连接。

    protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = this.javaChannel().accept();

try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable var6) {
logger.warn("Failed to create a new channel from an accepted socket.", var6);

try {
ch.close();
} catch (Throwable var5) {
logger.warn("Failed to close a socket.", var5);
}
}

return 0;
}
对于NIOSocketChannel,它的读操作就是从SocketChannel中读取ByteBuffer

    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
return byteBuf.writeBytes(this.javaChannel(), byteBuf.writableBytes());
}
如果网络操作为写,则证明有半包消息没有发送完,通过调用forceFlush()使其继续发送

                if ((readyOps & 4) != 0) {
ch.unsafe().forceFlush();
}
如果网络操作位为连接状态,则需要对连接结果进行判读
                if ((readyOps & 8) != 0) {
int ops = k.interestOps();
ops &= -9;
k.interestOps(ops);
unsafe.finishConnect();
}
需要注意的是,在进行finishConnect判断之前,需要将网络操作位进行修改,注销掉SelectionKey.OP_CONNECT。

 处理完I/O事件之后,NioEventLoop需要执行非I/O操作的系统Task和定时任务,由于NioEventLoop需要同时处理I/O事件和

非I/O任务,为了保证两者都能得到足够的CPU时间被执行,Netty提供了I/O比例供用户定制。如果I/O操作多于定时任务和Task,

则可以将I/O比例跳大,反之则调小,默认为50%

                long ioTime = System.nanoTime() - ioStartTime;
this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
即进行runAllTasks

    protected boolean runAllTasks(long timeoutNanos) {
this.fetchFromScheduledTaskQueue();
Runnable task = this.pollTask();
if (task == null) {
return false;
} else {
long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0L;

long lastExecutionTime;
while(true) {
try {
task.run();
} catch (Throwable var11) {
logger.warn("A task raised an exception.", var11);
}

++runTasks;
if ((runTasks & 63L) == 0L) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}

task = this.pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}

this.lastExecutionTime = lastExecutionTime;
return true;
}
}
首先从定时任务消息队列中弹出消息来处理,如果为空,则退出。

        this.fetchFromScheduledTaskQueue();
Runnable task = this.pollTask();
if (task == null) {
return false;
}

 如果有,则循环执行定时任务,并且根据时间戳来判断操作是否已经超过了分配给非I/O操作的超时时间,超过则退出,

默认每经过64次循环则进行一次上述判断。防止由于非I/O任务过多导致I/O操作被长时间阻塞

            while(true) {
try {
task.run();
} catch (Throwable var11) {
logger.warn("A task raised an exception.", var11);
}

++runTasks;
if ((runTasks & 63L) == 0L) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}

task = this.pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
 runAllTasks方法执行完成之后,会判断系统是否进入优雅停机状态,如果处理关闭状态,则需要调用closeAll方法,释放资源,并放NioEventLoop线程退出循环,结束运行

            if (this.isShuttingDown()) {
this.closeAll();
if (this.confirmShutdown()) {
this.cleanupAndTerminate(true);
return;
}
}
 closeAll()方法里会遍历所有的Channel,然后调用它的unsafe().close方法关闭所有链路,释放线程池、ChannelPipeline和ChannelHandler等资源

NioEventLoop的源码分析到此结束,欢迎大家一起讨论。