Netty 源码 NioEventLoop(三)执行流程
Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)
上文提到在启动 NioEventLoop 线程时会执行 SingleThreadEventExecutor#doStartThread(),在这个方法中调用 SingleThreadEventExecutor.this.run(),NioEventLoop 重写了 run() 方法。NioEventLoop#run() 代码如下:
@Override
protected void run() {
for (;;) {
try {
// 1. select 策略选择
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
// 1.1 非阻塞的 select 策略。实际上,默认情况下,不会返回 CONTINUE 的策略
case SelectStrategy.CONTINUE:
continue;
// 1.2 阻塞的 select 策略
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// 1.3 不需要 select,目前已经有可以执行的任务了
default:
}
// 2. 执行网络 IO 事件和任务调度
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// 2.1. 处理网络 IO 事件
processSelectedKeys();
} finally {
// 2.2. 处理系统 Task 和自定义 Task
runAllTasks();
}
} else {
// 根据 ioRatio 计算非 IO 最多执行的时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// 3. 关闭线程
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
NioEventLoop#run() 做了记下事情:
- 根据 selectStrategy 执行不同的策略
- 执行网络 IO 事件和任务调度
- 关闭线程
一、IO 轮询策略
当 taskQueue 中没有任务时,那么 Netty 可以阻塞地等待 IO 就绪事件。而当 taskQueue 中有任务时,我们自然地希望所提交的任务可以尽快地执行 ,因此 Netty 会调用非阻塞的 selectNow() 方法,以保证 taskQueue 中的任务尽快可以执行。
(1) hasTasks
首先,在 run 方法中,第一步是调用 hasTasks() 方法来判断当前任务队列中是否有任务
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
这个方法很简单,仅仅是检查了一下 taskQueue 是否为空。至于 taskQueue 是什么呢,其实它就是存放一系列的需要由此 EventLoop 所执行的任务列表。关于 taskQueue,我们这里暂时不表,等到后面再来详细分析它。
(2) DefaultSelectStrategy
// NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
// 非阻塞的 select 策略。实际上,默认情况下,不会返回 CONTINUE 的策略
SelectStrategy.SELECT = -1;
// 阻塞的 select 策略
SelectStrategy.CONTINUE = -2;
// DefaultSelectStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
显然当 taskQueue 为空时,执行的是 select(oldWakenUp) 方法。那么 selectNow() 和 select(oldWakenUp) 之间有什么区别呢? 来看一下,selectNow() 的源码如下
(3) selectNow
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
调用 JDK 底层的 selector.selectNow()。selectNow() 方法会检查当前是否有就绪的 IO 事件,如果有,则返回就绪 IO 事件的个数;如果没有,则返回 0。注意,selectNow() 是立即返回的,不会阻塞当前线程。当 selectNow() 调用后,finally 语句块中会检查 wakenUp 变量是否为 true,当为 true 时,调用 selector.wakeup() 唤醒 select() 的阻塞调用。
(4) select(boolean oldWakenUp)
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectedKeys = selector.select(timeoutMillis);
} catch (CancelledKeyException e) {
}
}
在这个 select 方法中,调用了 selector.select(timeoutMillis),而这个调用是会阻塞住当前线程的,timeoutMillis是阻塞的超时时间。到来这里,我们可以看到,当 hasTasks() 为真时,调用的的 selectNow() 方法是不会阻塞当前线程的,而当 hasTasks() 为假时,调用的 select(oldWakenUp) 是会阻塞当前线程的。
二、IO 事件的处理
在 NioEventLoop.run() 方法中,第一步是通过 select/selectNow 调用查询当前是否有就绪的 IO 事件,那么当有 IO 事件就绪时,第二步自然就是处理这些 IO 事件啦。首先让我们来看一下 NioEventLoop.run 中循环的剩余部分:
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// 2.1. 处理网络 IO 事件
processSelectedKeys();
} finally {
// 2.2. 处理系统 Task 和自定义 Task
runAllTasks();
}
} else {
// 根据 ioRatio 计算非 IO 最多执行的时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
上面列出的代码中,有两个关键的调用:
- 第一个是 processSelectedKeys():处理准备就绪的 IO 事件;
- 第二个是 runAllTasks():运行 taskQueue 中的任务。
这里的代码还有一个十分有意思的地方,即 ioRatio。那什么是 ioRatio 呢?它表示的是此线程分配给 IO 操作所占的时间比(即运行 processSelectedKeys 耗时在整个循环中所占用的时间)。例如 ioRatio 默认是 50,则表示 IO 操作和执行 task 的所占用的线程执行时间比是 1 : 1。当知道了 IO 操作耗时和它所占用的时间比,那么执行 task 的时间就可以很方便的计算出来了。
我们这里先分析一下 processSelectedKeys() 方法调用,runAllTasks() 留到下面再分析。processSelectedKeys() 方法的源码如下:
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
由于默认未开启 selectedKeys 优化功能,所以会进入 processSelectedKeysPlain 分支执。下面继续分析 processSelectedKeysPlain 的代码实现。
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
// NioSocketChannel 或 NioServerSocketChannel 进行 IO 读写相关的操作
processSelectedKey(k, (AbstractNioChannel) a);
} else {
// 用户自行注册的 Task 任务,一般情况下不会执行
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
// 省略...
}
}
processSelectedKey 方法源码如下:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略...
try {
int readyOps = k.readyOps();
// 1. OP_CONNECT 读写前要先处理连接,否则可能抛 NotYetConnectedException 异常
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 2. OP_WRITE
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// 3. OP_READ 或 OP_ACCEPT
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
这个代码是不是很熟悉啊?完全是 Java NIO 的 Selector 的那一套处理流程嘛!processSelectedKey 中处理了三个
事件,分别是:
-
OP_READ
可读事件,即 Channel 中收到了新数据可供上层读取. -
OP_WRITE
可写事件,即上层可以向 Channel 写入数据. -
OP_CONNECT
连接建立事件,即 TCP 连接已经建立,Channel 处于 active 状态.
下面我们分别根据这三个事件来看一下 Netty 是怎么处理的吧。
2.1 OP_READ
当就绪的 IO 事件是 OP_READ,代码会调用 unsafe.read() 方法。unsafe 我们已见过多次,NioSocketChannel 的 Unsafe 是在 AbstractNioByteChannel 中实现的,而 NioServerSocketChannel 的 Unsafe 是在 NioMessageUnsafe 中实现。
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 1. 分配缓冲区 ByteBuf
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
// 2. 从 NioSocketChannel 中读取数据
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 3. 调用 pipeline.fireChannelRead 发送一个 inbound 事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
上面 read 方法其实归纳起来,可以认为做了如下工作:
- 分配 ByteBuf
- 从 SocketChannel 中读取数据
- 调用 pipeline.fireChannelRead 发送一个 inbound 事件
2.2 OP_WRITE
OP_WRITE 可写事件代码如下。这里代码比较简单,没有详细分析的必要了。
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
2.3 OP_CONNECT
最后一个事件是 OP_CONNECT,即 TCP 连接已建立事
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// 已连接后就需要注销 OP_CONNECT 事件 See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
OP_CONNECT 事件的处理中,只做了两件事情:
正如代码中的注释所言, 我们需要将 OP_CONNECT 从就绪事件集中清除, 不然会一直有 OP_CONNECT 事件。
调用 unsafe.finishConnect() 通知上层连接已建立
unsafe.finishConnect() 调用最后会调用到 pipeline().fireChannelActive(),产生一个 inbound 事件,通知 pipeline 中的各个 handler TCP 通道已建立(即 ChannelInboundHandler.channelActive 方法会被调用)
到了这里,我们整个 NioEventLoop 的 IO 操作部分已经了解完了,接下来的一节我们要重点分析一下 Netty 的任务
队列机制。
三、任务调度
我们已经提到过,在 Netty 中,一个 NioEventLoop 通常需要肩负起两种任务,第一个是作为 IO 线程,处理 IO 操作;第二个就是作为任务线程,处理 taskQueue 中的任务。这一节的重点就是分析一下 NioEventLoop 的任务队列机制
的。
3.1 普通 Runnable 任务
// SingleThreadEventExecutor
private final Queue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(maxPendingTasks);
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
因此实际上,taskQueue 是存放着待执行的任务的队列。
3.2 schedule 任务
除了通过 execute 添加普通的 Runnable 任务外,我们还可以通过调用 eventLoop.scheduleXXX 之类的方法来添加
一个定时任务。schedule 功能的实现是在 SingleThreadEventExecutor 的父类,即 AbstractScheduledEventExecutor 中实现的。
// SingleThreadEventExecutor
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
scheduledTaskQueue 是一个队列(Queue),其中存放的元素是 ScheduledFutureTask。而ScheduledFutureTask 我们很容易猜到,它是对 Schedule 任务的一个抽象。我们来看一下 AbstractScheduledEventExecutor 所实现的 schedule 方法:
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
3.3 执行调度任务
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
我们前面已经提到过,EventLoop 可以通过调用 EventLoop.execute 来将一个 Runnable 提交到 taskQueue 中,
也可以通过调用 EventLoop.schedule 来提交一个 schedule 任务到 scheduledTaskQueue 中。在此方法的一开
始调用的 fetchFromScheduledTaskQueue() 其实就是将 scheduledTaskQueue 中已经可以执行的(即定时时
间已到的 schedule 任务) 拿出来并添加到 taskQueue 中,作为可执行的 task 等待被调度执行。代码如下:
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
接下来 runAllTasks() 方法就会不断调用 task = pollTask() 从 taskQueue 中获取一个可执行的 task,然后调用它
的 run() 方法来运行此 task。
注意: 因为 EventLoop 既需要执行 IO 操作,又需要执行 task,因此我们在调用 EventLoop.execute 方法提交
任务时,不要提交耗时任务,更不能提交一些会造成阻塞的任务,不然会导致我们的 IO 线程得不到调度,影响整
个程序的并发量。
每天用心记录一点点。内容也许不重要,但习惯很重要!