Netty,Kafka,Muduo关于时间轮的一些思考 之netty时间轮

时间:2021-05-26 15:04:31

无论是网络库,还是各种网络服务器,使用定时器永远是不可避免的,有那么多链接需要管理,不可能为每一个链接都开一个线程,也不可能每一次操作都遍历每一个链接去检查它们是否过期,高效的定时器框架是必须的.

于是而定时器的经典模型时间轮应运而生.

时间轮的模型其实很简单,万变不离其宗.网上有很多资料,这里我不多赘述,今天主要是讲,在我阅读开源项目时,看到的那些时间轮.

首先最简单的,当属netty的时间轮.

我在网上看过一些博客或者文章,有关netty时间轮的,发现有些文章有一个误区,就是netty的连接超时管理是用时间轮完成的.其实不是的,netty的超时链接管理类似于nginx.在eventloop中每一次selector.poll(timeout),这个timeout会取最先超时的链接与现在时间的时间差.比如现在eventloop只管理了一个连接,5S后超时,那么这个poll的timeout就取5S.selector被唤醒要么接收或者发送了新的tcp报文,要么因为5S到了,该处理这个超时连接了,

但是每一次poll醒来,其实并不知道自己因为tcp报文被唤醒还是timeout被唤醒.所以每次醒来还是要检查某个管理链接的超时队列.超时队列其实是以超时时间为key组织的红黑树.所以只要检查前几个最先过期的连接,而不用遍历所有连接.

那时间轮在netty中有什么用?其实在netty的默认实现中,并没有用过时间轮,至少在我看的4.x是这样的.时间轮只是netty这个框架提供给用户的一个辅助工具,上一些源码.

public class HashedWheelTimer implements Timer {
    .....
    .....
    private final Thread workerThread;
    private final HashedWheelBucket[] wheel;
    private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
    private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
首先,关注workerThread.所有提交给WheelTimer的任务都是在这个workerThread中完成的.可以说HashedWheelTimer本身也可以看作一个executor.

HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
    this.timer = timer;
    this.task = task;
    this.deadline = deadline;
}
这个HashedWheelTimeout其实就是对要运行的task以及过期时间的封装.每一次客户调用add,都会加到这个timeouts的队列里,而非时间轮的bucket,之后由这个workerThread自己把这个队列里的task加到相应的bucket中.

接下来上workerThread中具体运行的Runnable

do {
    final long deadline = waitForNextTick();
    if (deadline > 0) {
        int idx = (int) (tick & mask);
        //canceledTimeouts队列里面装得是用户收到调用task.cancel()取消的任务,
        //这里把这些task从相应的位置给remove        
        processCancelledTasks();
        HashedWheelBucket bucket =
                wheel[idx];
        //把timeouts队列中的任务加到buckets中.
        transferTimeoutsToBuckets();
        //执行已经到期或者过期的任务
        bucket.expireTimeouts(deadline);
        tick++;
    }
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
看注释其实已经可以把这个timewheel的逻辑给摸个大概,先移除已经被取消的任务,然后把没被取消的任务加入相依的bucket中,再取出当前时间需要执行bucket,依次执行bucket中的任务.

看一看这个waitForNextTick()

private long waitForNextTick() {
    long deadline = tickDuration * (tick + 1);

    for (;;) {
        final long currentTime = System.nanoTime() - startTime;
        //1ms = 10^6 ns
        //这里+999999,其实是为了上取整
        //然后等到时间>=deadline了开始执行业务逻辑
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

        if (sleepTimeMs <= 0) {
            if (currentTime == Long.MIN_VALUE) {
                return -Long.MAX_VALUE;
            } else {
                return currentTime;
            }
        }

        // Check if we run on windows, as if thats the case we will need
        // to round the sleepTime as workaround for a bug that only affect
        // the JVM if it runs on windows.
        //
        // See https://github.com/netty/netty/issues/356

        try {
            //很朴素用的就是sleep
            Thread.sleep(sleepTimeMs);
        } catch (InterruptedException ignored) {
            if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                return Long.MIN_VALUE;
            }
        }
    }
}
这边的逻辑是考虑到,由于每次执行任务是要时间的.每次sleep的时间要综合考虑上一次执行任务所花费的时间,如果太长了,已经到了下一个bucket的执行时间,就直接返回.如果还没到就sleep相应的时间.

这就是netty的时间轮了,很朴素实现也很简单,所有的实现都在HashTimeWheel这一个文件中.

但是这个实现方式在某些场景下并不适用,比如如果用时间轮来管理超时的连接,那么这个连接上每返回一次select.poll()我就要加入一个timeout到时间轮中,同时cancel有关这个连接的上一个timeout,因为超时的时间变了.如果这个连接的速率很高,selector.poll返回的很频繁,那么就需要频繁地在时间轮中插入任务,取消任务,而且要管理的连接一多那么这个代价是很大的.

怎么办呢?在我下一篇文章中会介绍陈硕老师的网络库,muduo的解决方式.