Netty源码分析之NioEventLoop执行流程

时间:2022-09-05 13:42:40

NioEventLoop启动触发条件:

1.服务端绑定本地端口

2.新连接接入通过chooser绑定一个NioEventLoop

服务端绑定本地端口

  绑定本地端口,使用下面方法;

ChannelFuture future = bootstrap.bind(host, port).sync();

  最终会调用doBind0()方法:

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            if(regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }

        }
    });
}

  这个时候就会调用channel对应NioEventLoop的execute方法,会判断是否在当前的eventloop对应的thread中,如果在,直接向任务队列中添加绑定端口的任务,如果不在,首先要start当前eventLoop对应的thread,再将任务放到任务队列中。这里的excute(task)方法,并不是让线程直接执行它,而是将它放到线程的任务队列中,等待线程去执行它。

public void execute(Runnable task) {
    if(task == null) {
        throw new NullPointerException("task");
    } else {
        boolean inEventLoop = this.inEventLoop();
        if(inEventLoop) {
            this.addTask(task);
        } else {
            this.startThread();
            this.addTask(task);
            if(this.isShutdown() && this.removeTask(task)) {
                reject();
            }
        }

        if(!this.addTaskWakesUp && this.wakesUpForTask(task)) {
            this.wakeup(inEventLoop);
        }

    }
}

NioEventLoop线程执行逻辑

  NioEventLoop对应线程的run方法,run()方法里面是一个死循环,主要的逻辑是首先采用select检查是否有IO事件,如果有IO事件,就采用processSelectedKey()对IO事件进行处理,最后调用runAllTasks()处理任务队列中的任务。

protected void run() {
    while(true) {
        boolean oldWakenUp = this.wakenUp.getAndSet(false);

        try {
            if(this.hasTasks()) {
                this.selectNow();
            } else {
                this.select(oldWakenUp);
                if(this.wakenUp.get()) {
                    this.selector.wakeup();
                }
            }

            this.cancelledKeys = 0;
            this.needsToSelectAgain = false;
            int t = this.ioRatio;
            if(t == 100) {
                this.processSelectedKeys();
                this.runAllTasks();
            } else {
                long e = System.nanoTime();
                this.processSelectedKeys();
                long ioTime = System.nanoTime() - e;
                this.runAllTasks(ioTime * (long)(100 - t) / (long)t);
            }

            if(this.isShuttingDown()) {
                this.closeAll();
                if(this.confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable var8) {
            logger.warn("Unexpected exception in the selector loop.", var8);

            try {
                Thread.sleep(1000L);
            } catch (InterruptedException var7) {
                ;
            }
        }
    }
}

  这段代码中的ioRadio是控制执行IO事件和执行任务队列中的任务的一个事件比,默认是50,代表执行IO事件处理和执行任务队列的任务事件比是1:1。

1)使用select检测IO事件

  通过Selector的select()方法可以选择已经准备就绪的通道 (这些通道包含你感兴趣的的事件)。比如你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。Java中的Selector几个重载的select()方法:

  • int select():阻塞到至少有一个通道在你注册的事件上就绪了。
  • int select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。
  • int selectNow():非阻塞,只要有通道就绪就立刻返回。

  select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。

  一旦调用select()方法,并且返回值不为0时,则可以通过调用Selector的selectedKeys()方法来访问已选择键集合 。如下: 

Set selectedKeys=selector.selectedKeys(); 

  Netty中首先判断任务队列是否为空,如果为空的话,就采用select(ltimeout)有超时设置的阻塞方法,如果不为空的话,就调用非阻塞的selectNow()方法,因为即使没有IO事件处理,也可以对任务队列中的任务进行处理。Netty中NioEventLoop的select和selectNow方法其实底层还是依靠selector的select方法。

void selectNow() throws IOException {
    try {
        this.selector.selectNow();
    } finally {
        if(this.wakenUp.get()) {
            this.selector.wakeup();
        }

    }

}

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;

    try {
        int e = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + this.delayNanos(currentTimeNanos);

        while(true) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if(timeoutMillis <= 0L) {
                if(e == 0) {
                    selector.selectNow();
                    e = 1;
                }
                break;
            }

            int selectedKeys = selector.select(timeoutMillis);
            ++e;
            if(selectedKeys != 0 || oldWakenUp || this.wakenUp.get() || this.hasTasks() || this.hasScheduledTasks()) {
                break;
            }

            if(Thread.interrupted()) {
                if(logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }

                e = 1;
                break;
            }

            long time = System.nanoTime();
            if(time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                e = 1;
            } else if(SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && e >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding selector.", Integer.valueOf(e));
                this.rebuildSelector();
                selector = this.selector;
                selector.selectNow();
                e = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if(e > 3 && logger.isDebugEnabled()) {
            logger.debug("Selector.select() returned prematurely {} times in a row.", Integer.valueOf(e - 1));
        }
    } catch (CancelledKeyException var13) {
        if(logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", var13);
        }
    }

}

  可以看到调用selectNow方法是直接调用java nio的select.selectNow方法,而Netty的select方法中有一个参数oldWakeUp记录当前操作是否是唤醒状态(不太清楚这个唤醒状态的作用),每次进行select操作之前,会将其标志位false,表示要进行select操作,而且是未唤醒状态。

  Netty中的select方法首先是根据当前时间时间去计算截止时间,这里使用到了超时队列(超时队列的作用也不太清楚),然后根据截止时间去计算超时时间,如果超时时间小于0,就执行selectNow操作,并退出此次select操作,否则执行带有超时时间的select方法,如果返回的selectKey不等于0,也就是有channel在select上注册了,或者该select操作被唤醒了(?),或者任务队列中有了任务,定时任务队列中有了任务,都会break出来。

  接下来的代码逻辑是避免JDK空轮询的,当JDK发生了空轮训,select会直接返回,这时并没有IO事件到达,也没有超过超时时间,这样会导致线程进入死循环,CPU利用率飙升至100%,JDK到现在也并没有解决这个问题。

  而Netty是通过记录空轮询的次数,如果这个次数达到了一个上限,上限默认是512,那么就新建一个selector,将注册在老selector上的channel注册到新的selector上,并且关闭老的selector,将新的selector替代老的selector。Netty通过rebuildSelector方法重建selector。

public void rebuildSelector() {
    if(!this.inEventLoop()) {
        this.execute(new Runnable() {
            public void run() {
                NioEventLoop.this.rebuildSelector();
            }
        });
    } else {
        Selector oldSelector = this.selector;
        if(oldSelector != null) {
            Selector newSelector;
            try {
                newSelector = this.openSelector();
            } catch (Exception var9) {
                logger.warn("Failed to create a new Selector.", var9);
                return;
            }

            int nChannels = 0;

            label69:
            while(true) {
                try {
                    Iterator t = oldSelector.keys().iterator();

                    while(true) {
                        if(!t.hasNext()) {
                            break label69;
                        }

                        SelectionKey key = (SelectionKey)t.next();
                        Object a = key.attachment();

                        try {
                            if(key.isValid() && key.channel().keyFor(newSelector) == null) {
                                int e = key.interestOps();
                                key.cancel();
                                SelectionKey var14 = key.channel().register(newSelector, e, a);
                                if(a instanceof AbstractNioChannel) {
                                    ((AbstractNioChannel)a).selectionKey = var14;
                                }

                                ++nChannels;
                            }
                        } catch (Exception var11) {
                            logger.warn("Failed to re-register a Channel to the new Selector.", var11);
                            if(a instanceof AbstractNioChannel) {
                                AbstractNioChannel var13 = (AbstractNioChannel)a;
                                var13.unsafe().close(var13.unsafe().voidPromise());
                            } else {
                                NioTask task = (NioTask)a;
                                invokeChannelUnregistered(task, key, var11);
                            }
                        }
                    }
                } catch (ConcurrentModificationException var12) {
                    ;
                }
            }

            this.selector = newSelector;

            try {
                oldSelector.close();
            } catch (Throwable var10) {
                if(logger.isWarnEnabled()) {
                    logger.warn("Failed to close the old Selector.", var10);
                }
            }

            logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }
    }
}

2)processSelectedKey()

netty中selectedKey的优化

  通过调用Selector的selectedKeys()方法来访问已选择键集合,此时返回的是HashSet。但是netty是通过反射的方式,将HashSet替换成数组pssSelectedKeysOptimized去处理IO事件。


 

private Selector openSelector() {
    AbstractSelector selector;
    try {
        selector = this.provider.openSelector();
    } catch (IOException var7) {
        throw new ChannelException("failed to open a new selector", var7);
    }

    if(DISABLE_KEYSET_OPTIMIZATION) {
        return selector;
    } else {
        try {
            SelectedSelectionKeySet t = new SelectedSelectionKeySet();
            Class selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
            if(!selectorImplClass.isAssignableFrom(selector.getClass())) {
                return selector;
            }

            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);
            selectedKeysField.set(selector, t);
            publicSelectedKeysField.set(selector, t);
            this.selectedKeys = t;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
        } catch (Throwable var6) {
            this.selectedKeys = null;
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, var6);
        }

        return selector;
    }
}

 

  首先会调用JDK的openSelector方法返回创建的selector,然后会判断是否要对keySet进行优化,通过判断DISABLE_KEYSET_OPTIMIZATION,是否要对keyset进行优化,默认是要对keyset进行优化的。这里的SelectedSelectionKeySet是优化过后的keyset,底层是通过两个数组加上两个数组的大小进行实现的,这样可以使得add操作达到o(1)的时间复杂度(但是是HashSet的add操作时间复杂度不也是o(1))嘛,

 

processSelectedKey调用processSelectedKeysOptimized

  该方法的流程就是遍历数组中所有的selectedKey,一旦遍历完,就将该引用指向为空。获取每一个selectorKey对应的channel,然后通过调用processSelectedKey去处理该channel上感兴趣的事件。

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    int i = 0;
  
  //遍历SelectedKsys while(true) { SelectionKey k = selectedKeys[i]; if(k == null) { return; } selectedKeys[i] = null;
     //获取selectKey对应的channel Object a = k.attachment(); if(a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel)((AbstractNioChannel)a)); } else { NioTask task = (NioTask)a; processSelectedKey(k, (NioTask)task); } if(this.needsToSelectAgain) { while(selectedKeys[i] != null) { selectedKeys[i] = null; ++i; } this.selectAgain(); selectedKeys = this.selectedKeys.flip(); i = -1; } ++i; } }

  这里处理selector上面的IO事件,底层其实都是通过channel的unsafe类进行操作的,这里read和accept事件对应的都是channel的read方法。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if(!k.isValid()) {
        unsafe.close(unsafe.voidPromise());
    } else {
        try {
            int ignored = k.readyOps();
       //如果是read或者accept事件就对channel进行读操作 if((ignored & 17) != 0 || ignored == 0) { unsafe.read(); if(!ch.isOpen()) { return; } }
       //write事件 if((ignored & 4) != 0) { ch.unsafe().forceFlush(); }
       //connect事件 if((ignored & 8) != 0) { int ops = k.interestOps(); ops &= -9; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException var5) { unsafe.close(unsafe.voidPromise()); } } }

 

3)使用runAllTasks()执行任务队列中的事件

  

PriorityQueue(优先级队列),也是非线程安全的队列。

 

 

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if(task == null) {
        throw new NullPointerException("task");
    } else {
        if(this.inEventLoop()) {
            this.delayedTaskQueue.add(task);
        } else {
            this.execute(new Runnable() {
                public void run() {
                    SingleThreadEventExecutor.this.delayedTaskQueue.add(task);
                }
            });
        }

        return task;
    }
}

 

  runAllTask首先从定时任务队列中拉取定时任务,将需要执行的定时任务加入到普通任务队列中,并计算截止时间,然后循环的从普通任务队列中拉取任务,并执行任务,这里判断是否到达超时时间,是每相隔64个任务,就判断是否到达最大任务执行时间。为啥要每隔64个任务判断是否超时呢?因为nanoTime也是比较费时的。

protected boolean runAllTasks(long timeoutNanos) {
    this.fetchFromDelayedQueue();
    Runnable task = this.pollTask();
    if(task == null) {
        return false;
    } else {
        long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0L;

        long lastExecutionTime;
        while(true) {
            try {
                task.run();
            } catch (Throwable var11) {
                logger.warn("A task raised an exception.", var11);
            }

            ++runTasks;
            if((runTasks & 63L) == 0L) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if(lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = this.pollTask();
            if(task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
}

  从定时队列中拉取任务,这里拉取的任务是拉取截止时间不超过nanoTime的任务,将任务从定时任务队列中删除,将任务加入到普通任务队列中。这个while循环执行完成之后,所有需要执行的定时任务全部都加入到普通任务队列中。

private void fetchFromDelayedQueue() {
    long nanoTime = 0L;

    while (true) {
        ScheduledFutureTask delayedTask = (ScheduledFutureTask) this.delayedTaskQueue.peek();
        if (delayedTask == null) {
            break;
        }

        if (nanoTime == 0L) {
            nanoTime = ScheduledFutureTask.nanoTime();
        }

        if (delayedTask.deadlineNanos() > nanoTime) {
            break;
        }

        this.delayedTaskQueue.remove();
        this.taskQueue.add(delayedTask);
    }
}

   定时任务队列是一个优先级队列,队列按照优先级进行排序,这里的优先级是每个任务的截止时间,队列是按照截止时间的早晚对任务进行排序的。

public int compareTo(Delayed o) {
    if(this == o) {
        return 0;
    } else {
        ScheduledFutureTask that = (ScheduledFutureTask)o;
        long d = this.deadlineNanos() - that.deadlineNanos();
        if(d < 0L) {
            return -1;
        } else if(d > 0L) {
            return 1;
        } else if(this.id < that.id) {
            return -1;
        } else if(this.id == that.id) {
            throw new Error();
        } else {
            return 1;
        }
    }
}

总结:

1.默认情况下,NioEventLoopGroup会创建2*cpu个数的线程池,在调用NioEventLoop.execute(task)的时候,如果当前的NioEventLoop没有创建自己的线程,就会创建线程。

2.Netty如何解决JDK空轮训bug?通过计算空轮训操作的个数,这里的空轮训的判断是既没有IO事件的到达,也没有达到超时时间,如果空轮训的个数超过阈值(512),就会新建一个selector,将旧selector的selectorKey注册到新的selector上,将旧的selector关闭,用新的selector替代旧的selector。

3.Netty在所有外部线程调用NioEventLoop的操作时,如果通过InEventLoop判断是否在NioEventLoop所属的线程,如果不在通过startThread启动NioEventLoop的线程,并且将任务添加到NioEventLoop的任务队列中,所有NioEventLoop对应一个线程,其中的操作只会被一个线程所执行,实现了异步串行无锁化。