在日常开发中存在着调度延时任务、定时任务的需求,而jdk中提供了两种基于内存的任务调度工具,即相对早期的java.util.Timer类和java.util.concurrent中的ScheduledThreadPoolExecutor。
Timer介绍
Timer类其底层基于完全二叉堆实现的优先级队列,使得当前最早应该被执行的任务始终保持在队列头,并能以O(log n)对数的时间复杂度完成任务的入队和出队。
Timer是比较早被引入jdk的,其只支持单线程处理任务,因此如果先被处理的任务比较耗时便会阻塞后续任务的执行,进而导致任务调度不够及时(比如本来10分钟后要执行的任务,可能被前一个耗时的任务拖延到15分钟后才执行)
ScheduledThreadPoolExecutor介绍
Timer调度器的改进版本ScheduledThreadPoolExecutor在jdk1.5中随着juc包一起被引入。
- ScheduledThreadPoolExecutor能够支持调度一次性的延迟任务,和固定频率/固定延迟的定时任务(这两种定时任务的具体区别会在下文展开)
- ScheduledThreadPoolExecutor其底层同样基于二叉堆实现的优先级队列来存储任务的,能做到对数时间复杂度的任务入队和出队。
与Timer不同的是,ScheduledThreadPoolExecutor作为juc下ThreadPoolExecutor的子类拓展,支持多线程并发的处理提交的调度任务。 - 在线程池并发线程数合理且任务执行耗时也合理的情况下,一般不会出现之前被调度的任务阻塞后续任务调度的情况。
但反之,如果同一时间内需要调度的任务过多超过了线程池并发的负荷或者某些任务执行时间过长导致工作线程被长时间占用,则ScheduledThreadPoolExecutor依然无法保证实时的调度。
ScheduledThreadPoolExecutor是建立在二叉堆优先级队列和juc的ThreadPoolExecutor基础之上的,如果对两者工作原理不甚了解的话,会严重影响对ScheduledThreadPoolExecutor的理解。
- 对二叉堆优先级队列原理不太理解的可以参考下我之前的博客:
https://www.cnblogs.com/xiaoxiongcanguan/p/10421560.html (自己动手实现java数据结构(八) 优先级队列) - 对juc的ThreadPoolExecutor原理不太理解的可以参考下我之前的博客:
https://www.cnblogs.com/xiaoxiongcanguan/p/16879296.html (jdk线程池ThreadPoolExecutor工作原理解析(自己动手实现线程池)(一))
https://www.cnblogs.com/xiaoxiongcanguan/p/16901298.html (jdk线程池ThreadPoolExecutor优雅停止原理解析(自己动手实现线程池)(二))
ScheduledThreadPoolExecutor源码分析
在展开分析ScheduledThreadPoolExecutor的源码之前,先思考几个问题。带着问题去阅读源码会更有效率。
- ScheduledThreadPoolExecutor是如何基于ThreadPoolExecutor来实现多线程并发调度的?
- ScheduledThreadPoolExecutor是如何存储任务的,以高效的保证提交的任务能按照其调度时间的先后准确的被依次调度?
- 对于周期性的任务ScheduledThreadPoolExecutor是如何进行调度的?固定频率/固定延迟的周期性任务到底有什么不同?
为了加深理解和添加注释,我基于jdk的ScheduledThreadPoolExecutor自己重新实现了一遍。所有的类名都在jdk类的基础上加上My后缀,便于区分。
1.ScheduledThreadPoolExecutor是如何基于ThreadPoolExecutor来实现多线程并发调度的?
- 我们知道ThreadPoolExecutor线程池中的工作线程会不断尝试从工作队列中拉取任务,并且并发的执行。默认情况下不同任务的优先级是相同的,所以一般的工作队列是先进先出的,即更早提交的任务先入队因而也先被执行。
- ScheduledThreadPoolExecutor作为一个处理调度任务的线程池作为ThreadPoolExecutor的子类拓展了其实现。其中不同任务被执行的优先级并不是基于提交时间的,而是取决于调度任务提交时所指定的执行时间,即执行时间越早的任务越早出队,越早被工作线程拉取并执行。
- ScheduledThreadPoolExecutor的构造方法中不允许外部指定工作队列,而是使用一个专供内部使用的、特殊定制的阻塞队列DelayedWorkQueue(和DelayQueue类似,实现细节在下文展开)。
综上所述,ScheduledThreadPoolExecutor作为ThreadPoolExecutor的子类,大量复用了ThreadPoolExecutor中的逻辑,主要提供了一个定制化的工作队列外就很巧妙地实现了多线程并发的任务调度功能。
MyScheduledThreadPoolExecutor构造方法以及成员属性(成员属性具体的作用在下文展开)
/**
* MyScheduledThreadPoolExecutor
* */
public class MyScheduledThreadPoolExecutor extends MyThreadPoolExecutorV2 implements MyScheduledExecutorService {
/**
* 单调自增发号器,为每一个新创建的ScheduledFutureTask设置一个唯一的序列号
* */
private static final AtomicLong sequencer = new AtomicLong();
/**
* 取消任务时,是否需要将其从延迟队列中移除掉
* True if ScheduledFutureTask.cancel should remove from queue
* */
private volatile boolean removeOnCancel = false;
/**
* False if should cancel/suppress periodic tasks on shutdown.
*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown = false;
/**
* False if should cancel non-periodic tasks on shutdown.
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
/**
* 比父类ThreadPoolExecutor相对受限的构造函数
* */
public MyScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, MyRejectedExecutionHandler handler) {
// 1. 只能使用内部的DelayedWorkQueue作为工作队列。
// DelayedWorkQueue是*队列,只需要指定corePoolSize即可,maximumPoolSize没用(核心线程不够用就全部在队列里积压着等慢慢消费)
// 2. corePoolSize决定了ScheduledThreadPoolExecutor处理任务的及时性。核心线程越多处理任务就越及时,越不容易被非常耗时的任务影响调度的实时性,但也越消耗系统资源。
// 3. keepAliveTime=0,一般来说核心线程是不应该退出的,除非父类里allowCoreThreadTimeOut被设置为true了
// 那样没有任务时核心线程就会立即被回收了(keepAliveTime=0, allowCoreThreadTimeOut=true)
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new MyDelayedWorkQueue(), threadFactory, handler);
}
}
2.ScheduledThreadPoolExecutor是如何存储任务的,以高效的保证提交的任务能按照其调度时间的先后准确的被依次调度?
- DelayedWorkQueue是ScheduledThreadPoolExecutor内部专门定制的工作队列,其实现了BlockingQueue接口,底层基于数组实现的完全二叉堆来存储任务对象。
- 队列存储的任务对象也是ScheduledThreadPoolExecutor中专门定制的ScheduledFutureTask类,其中包含了一个关键成员属性time,标识着该任务应该被何时调度的绝对时间戳(单位nanos)。
同时其compareTo方法中,保证被调度时间越早的任务其比较时值就越小。如果time完全一样的话则基于全局发号器分配的序列号进行比较,序列号越小的说明越早入队则排在队列的前面。
MyScheduledFutureTask实现(省略了一些与当前内容无关的逻辑)
/**
* 调度任务对象
* */
private class MyScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>{
/**
* 用于保证相同延迟时间的任务,其FIFO(先提交的先执行)的特性
* */
private final long sequenceNumber;
/**
* 当前任务下一次执行的时间(绝对时间,单位纳秒nanos)
* */
private long time;
/**
* 需要重复执行的任务使用的属性
* 1 period>0,说明该任务是一个固定周期重复执行的任务(通过scheduleAtFixedRate方法提交)
* 2 period<0,说明该任务是一个固定延迟重复执行的任务(通过scheduleWithFixedDelay方法提交)
* 3 period=0,说明该任务是一个一次性执行的任务(通过schedule方法提交)
* */
private final long period;
/**
* 定期任务实际执行的具体任务
* */
RunnableScheduledFuture<V> outerTask = this;
/**
* 基于二叉堆的延迟队列中的数组下标,用于快速的查找、定位
* */
int heapIndex;
/**
* 一次性任务的构造函数(one action)
* */
MyScheduledFutureTask(Runnable runnable, V result, long ns) {
super(runnable, result);
// 下一次执行的时间
this.time = ns;
// 非周期性任务,period设置为0
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 周期性任务的构造函数
*/
MyScheduledFutureTask(Runnable runnable, V result, long ns, long period) {
super(runnable, result);
// 下一次执行的时间
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
@Override
public boolean isPeriodic() {
// period为0代表是一次性任务
// period部位0代表是周期性任务
return period != 0;
}
/**
* 获得下一次执行的时间
* */
@Override
public long getDelay(TimeUnit unit) {
// 获得time属性与当前时间之差
long delay = time - System.nanoTime();
// 基于参数unit转换
return unit.convert(delay, NANOSECONDS);
}
/**
* 用于延迟队列中的优先级队列的大小比较
* 基于time比较
* 1. time越小,值越大(越早应该被调度执行的任务,越靠前)
* 2. time相等就进一步比较sequenceNumber(调度时间一致的)
* */
@Override
public int compareTo(Delayed other) {
if (other == this) {
// 同一个对象是相等的,返回0
return 0;
}
if (other instanceof MyScheduledFutureTask) {
// 同样是ScheduledFutureTask
MyScheduledFutureTask<?> x = (MyScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0) {
// 当前对象延迟时间更小,返回-1
return -1;
} else if (diff > 0) {
// 当前对象延迟时间更大,返回1
return 1;
} else if (sequenceNumber < x.sequenceNumber) {
// 延迟时间相等,比较序列号
// 当前对象序列号更小,需要排更前面返回-1
return -1;
} else {
// 当前对象序列号更大,返回1
return 1;
}
}else{
// 不是ScheduledFutureTask,通过getDelay比较延迟时间
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
// return (diff < 0) ? -1 : (diff > 0) ? 1 : 0
if(diff < 0){
// 当前对象延迟时间小,返回-1
return -1;
}else if(diff > 0){
// 当前对象延迟时间大,返回1
return 1;
}else{
// 延迟时间相等返回0
return 0;
}
}
}
}
- 完全二叉堆中的所有元素存储时保持全局的堆序性,这样当前被调度时间最早的的任务就能放在DelayedWorkQueue的队列头中,最先被工作线程take/poll时获取到。
- DelayedWorkQueue顾名思义,本质上还是一个DelayQueue延时队列,而延时队列一定是阻塞队列。延时队列与一般的阻塞队列相比有两个最重要的区别:
- 所存储的元素都实现了java.util.concurrent.Delayed接口,按照getDelay获得到的延迟时间的大小排序,值越小的在队列中越靠前(前面已经提到这个是基于完全二叉堆实现的)
- 在有消费者需要获取队头元素时,即使队列不为空但队头元素getDelay>0时也不会返回队头元素,而是被当做空队列来对待。如果是阻塞等待的话,则在队列头元素getDelay<=0时再唤醒阻塞等待的消费者。
上面延迟队列的第二个特点保证了ScheduledThreadPoolExecutor中的工作线程既不会在不正确的时间过早的对任务进行调度,也不会在当前时间未满足任务调度条件下空转而节约CPU(因为工作线程会被阻塞)
- DelayedWorkQueue中有一把全局锁(成员变量ReentrantLock lock),绝大多数操作都必须在锁的保护下才能进行。目的是为了避免提交任务入队、消费任务出队等操作时出现并发而引起bug。
DelayedWorkQueue的实现机制基本上等价于juc包下的PriorityQueue加DelayQueue。如果可以的话建议读者在理解了PriorityQueue、DelayQueue原理之后再来学习其工作机制,循序渐进而事半功倍。
很多实现的小细节都在MyDelayedWorkQueue中有详细的注释(比如二叉堆的插入、删除,以及延迟队列中getDelay值更小的任务入队时应该怎么处理等)。
MyDelayedWorkQueue实现(删减掉了一些非核心的逻辑)
/**
* 为调度任务线程池ScheduledThreadPoolExecutor专门定制的工作队列
* 1.基于完全二叉堆结构,令执行时间最小(最近)的任务始终位于堆顶(即队列头) ==> 小顶堆
* 2.实现上综合了juc包下的DelayQueue和PriorityQueue的功能,并加上了一些基于ScheduledThreadPoolExecutor的一些逻辑
* 建议读者在理解了PriorityQueue、DelayQueue原理之后再来学习其工作机制,循序渐进而事半功倍
* */
static class MyDelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
/**
* 完全二叉堆底层数组的初始容量
* */
private static final int INITIAL_CAPACITY = 16;
/**
* 完全二叉堆底层数组
* */
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
/**
* 互斥锁,用于入队等操作时的并发控制
* */
private final ReentrantLock lock = new ReentrantLock();
/**
* 队列中任务元素的数量
* */
private int size = 0;
/**
* 等待执行队列头(最早应该执行的)任务的线程池工作线程
* 为什么会引入一个这个呢?是为了减少其它线程take获取新任务时不必要的等待
* 因为额外引入了一个操作系统层面的定时器,await带超时时间比无限时间的await性能要差一些
* */
private Thread leader = null;
/**
* 当一个队列头部的任务可以被执行时,通知等待在available上的工作线程
* */
private final Condition available = lock.newCondition();
// ============================= 内部private的辅助方法 ================================
private void setIndex(RunnableScheduledFuture<?> f, int index) {
if (f instanceof MyScheduledFutureTask) {
// 如果任务对象是MyScheduledFutureTask类型,而不仅仅是RunnableScheduledFuture
// 则设置index属性便于加速查找
((MyScheduledFutureTask<?>) f).heapIndex = index;
}
}
/**
* 第k位的元素在二叉堆中上滤(小顶堆:最小的元素在堆顶)
* Call only when holding lock.
*
* 当新元素插入完全二叉堆时,我们直接将其插入向量末尾(堆底最右侧),此时新元素的优先级可能会大于其双亲元素甚至祖先元素,破坏了堆序性,
* 因此我们需要对插入的新元素进行一次上滤操作,使完全二叉堆恢复堆序性。
* 由于堆序性只和双亲和孩子节点相关,因此堆中新插入元素的非祖先元素的堆序性不会受到影响,上滤只是一个局部性的行为。
* */
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
// 获得第k个节点逻辑上的双亲节点
// 0
// 1 2
// 3 4 5 6
// (下标减1再除2,比如下标为5和6的元素逻辑上的parent就是下标为2的元素)
int parent = (k - 1) >>> 1;
// 拿到双亲节点对应的元素
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0) {
// 如果当前需要上滤的元素key,其值大于或等于双亲节点就停止上滤过程(小顶堆)
break;
}
// 当前上滤的元素key,其值小于双亲节点
// 将双亲节点换下来,到第k位上(把自己之前的位置空出来)
queue[k] = e;
// 设置被换下来的双亲节点的index值
setIndex(e, k);
// 令k下标变为更小的parent,继续尝试下一轮上滤操作
k = parent;
}
// 上滤判断结束后,最后空出来的parent的下标值对应的位置存放上滤的元素key
queue[k] = key;
// 设置key节点的index值
setIndex(key, k);
}
/**
* 第k位的元素在二叉堆中下滤(小顶堆:最小的元素在堆顶)
* Call only when holding lock.
*
* 当优先级队列中极值元素出队时,需要在满足堆序性的前提下,选出新的极值元素。
* */
private void siftDown(int k, RunnableScheduledFuture<?> key) {
// half为size的一半
int half = size >>> 1;
// k小于half才需要下滤,大于half说明第k位元素已经是叶子节点了,不需要继续下滤了
while (k < half) {
// 获得第k位元素逻辑上的左孩子节点的下标
int child = (k << 1) + 1;
// 获得左孩子的元素
RunnableScheduledFuture<?> c = queue[child];
// 获得第k位元素逻辑上的右孩子节点的下标
int right = child + 1;
// right没有越界,则比较左右孩子值的大小
if (right < size && c.compareTo(queue[right]) > 0) {
// 左孩子大于右孩子,所以用右孩子和key比较,c=右孩子节点
// (if条件不满足,则用左孩子和key比较,c=左孩子节点)
c = queue[child = right];
}
// key和c比较,如果key比左右孩子都小,则结束下滤
if (key.compareTo(c) <= 0) {
break;
}
// key大于左右孩子中更小的那个,则第k位换成更小的那个孩子(保证上层的节点永远小于其左右孩子,保证堆序性)
queue[k] = c;
// 设置被换到上层去的孩子节点的index的值
setIndex(c, k);
// 令下标k变大为child,在循环中尝试更下一层的下滤操作
k = child;
}
// 结束了下滤操作,最后将元素key放到最后被空出来的孩子节点原来的位置
queue[k] = key;
// 设置key的index值
setIndex(key, k);
}
/**
* 二叉堆扩容
* Call only when holding lock.
*/
private void grow() {
int oldCapacity = queue.length;
// 在原有基础上扩容50%
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) {
// 处理扩容50%后整型溢出
newCapacity = Integer.MAX_VALUE;
}
// 将原来数组中的数组数据复制到新数据
// 令成员变量queue指向扩容后的新数组
queue = Arrays.copyOf(queue, newCapacity);
}
/**
* 查询x在二叉堆中的数组下标
* @return 找到了就返回具体的下标,没找到返回-1
* */
private int indexOf(Object x) {
if(x == null){
// 为空,直接返回-1
return -1;
}
if (x instanceof MyScheduledFutureTask) {
int i = ((MyScheduledFutureTask) x).heapIndex;
// 为什么不直接以x.heapIndex为准?
// 因为可能对象x来自其它的线程池,而不是本线程池的
// (先判断i是否合法,然后判断第heapIndex个是否就是x)
if (i >= 0 && i < size && queue[i] == x) {
// 比对一下第heapIndex项是否就是x,如果是则直接返回
return i;
}else{
// heapIndex不合法或者queue[i] != x不相等,说明不是本线程池的任务对象,返回-1
return -1;
}
} else {
// 非ScheduledFutureTask,从头遍历到尾进行线性的检查
for (int i = 0; i < size; i++) {
// 如果x和第i个相等,则返回i
if (x.equals(queue[i])) {
return i;
}
}
// 遍历完了整个queue都没找到,返回-1
return -1;
}
}
// ============================= 实现接口定义的方法 ======================================
@Override
public boolean contains(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 不为-1就是存在,返回true
// 反之就是不存在,返回false
return indexOf(x) != -1;
} finally {
lock.unlock();
}
}
@Override
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0) {
// x不存在,直接返回
return false;
}
// x存在,先将其index设置为-1
setIndex(queue[i], -1);
// 二叉堆元素数量自减1
int s = --size;
// 将队列最尾端的元素取出
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
// s == i,说明被删除的是最尾端的元素,移除后没有破坏堆序性,直接返回即可
if (s != i) {
// 将队列最尾端的元素放到被移除元素的位置,进行一次下滤
siftDown(i, replacement);
if (queue[i] == replacement) {
// 这里为什么还要上滤一次呢?其实是最尾端的元素replacement在放到第i个位置上执行下滤后,其虽然保证了小于其左右孩子节点,但依然可能大于其双亲节点
// 举个例子:
// 0
// 10 1
// 20 30 2 3
// 40 50 60 70 4 5 6 7
// 如果删除第3排第一个的20,则siftDown后会变成:
// 0
// 10 1
// 7 30 2 3
// 40 50 60 70 4 5 6
// replacement=7是小于其双亲节点10的,因此需要再进行一次上滤,使得最终结果为:
// 0
// 7 1
// 10 30 2 3
// 40 50 60 70 4 5 6
// 这种需要上滤的情况是相对特殊的,只有当下滤只有这个节点没有动(即下滤后queue[i] == replacement)
// 因为这种情况下replacement不进行上滤的话**可能**小于其双亲节点,而违反了堆序性(heap invariant)
// 而如果下滤后移动了位置(queue[i] != replacement),则其必定大于其双亲节点,因此不需要尝试上滤了
siftUp(i, replacement);
// 额外的:
// 最容易理解的实现删除堆中元素的方法是将replacement置于堆顶(即第0个位置),进行一次时间复杂度为O(log n)的完整下滤而恢复堆序性
// 但与ScheduledThreadExecutor在第i个位置上进行下滤操作的算法相比其时间复杂度是更高的
// jdk的实现中即使下滤完成后再进行一次上滤,其**最差情况**也与从堆顶开始下滤的算法的性能一样。虽然难理解一些但却是更高效的堆元素删除算法
// 在remove方法等移除队列中间元素时,会比从堆顶直接下滤效率高
}
}
return true;
} finally {
lock.unlock();
}
}
/**
* 入队操作
* */
@Override
public boolean offer(Runnable x) {
if (x == null) {
throw new NullPointerException();
}
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length) {
// 容量不足,扩容
grow();
}
size = i + 1;
if (i == 0) {
// 队列此前为空,第0位设置就ok了
queue[0] = e;
setIndex(e, 0);
} else {
// 队列此前不为空,加入队尾进行一次上滤,恢复堆序性
siftUp(i, e);
}
// 插入堆后,发现自己是队列头(最早要执行的任务)
if (queue[0] == e) {
// 已经有新的队列头任务了,leader设置为空
leader = null;
// 通知take时阻塞等待获取新任务的工作线程
available.signal();
}
} finally {
lock.unlock();
}
// *队列,入队一定成功
return true;
}
/**
* 队列元素f出队操作(修改size等数据,并且恢复f移除后的堆序性)
* */
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// size自减1
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0) {
// 由于队列头元素出队了,把队尾元素放到队头进行一次下滤,以恢复堆序性
siftDown(0, x);
}
// 被移除的元素f,index设置为-1
setIndex(f, -1);
// 返回被移除队列的元素
return f;
}
/**
* 出队操作(非阻塞)
* */
@Override
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
if (first == null || first.getDelay(NANOSECONDS) > 0) {
// 如果队列为空或者队头元素没到需要执行的时间点(delay>0),返回null
return null;
} else {
// 返回队列头元素,并且恢复堆序性
return finishPoll(first);
}
} finally {
lock.unlock();
}
}
@Override
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// take是可响应中断的
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
// 队列为空,await等待(可响应中断)
available.await();
} else {
// 队列不为空
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) {
// 队列头的元素delay<=0,到可执行的时间点了,返回即可
return finishPoll(first);
}
// first设置为null,便于await期间提早gc这个临时变量
first = null; // don't retain ref while waiting
if (leader != null){
// leader不为空,说明已经有别的线程在take等待了,await无限等待
// (不带超时时间的await性能更好一些,队列头元素只需要由leader线程来获取就行,其它的线程就等leader处理完队头任务后将其唤醒)
available.await();
} else {
// leader为空,说明之前没有别的线程在take等待
Thread thisThread = Thread.currentThread();
// 令当前线程为leader
leader = thisThread;
try {
// leader await带超时时间(等待队列头的任务delay时间,确保任务可执行时第一时间被唤醒去执行)
available.awaitNanos(delay);
} finally {
if (leader == thisThread) {
// take方法退出,当前线程不再是leader了
leader = null;
}
}
}
}
}
} finally {
if (leader == null && queue[0] != null) {
// leader为空,且队列不为空(比如leader线程被唤醒后,通过finishPoll已经获得了之前的队列头元素)
// 尝试唤醒之前阻塞等待的那些消费者线程
available.signal();
}
lock.unlock();
}
}
@Override
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// pool是可响应中断的
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
// 队列为空
if (nanos <= 0) {
// timeout等待时间超时了,返回null(一般不是第一次循环)
return null;
} else {
// 队列元素为空,等待timeout
nanos = available.awaitNanos(nanos);
}
} else {
// 队列不为空
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) {
// delay<=0,队头元素满足出队条件
return finishPoll(first);
}
if (nanos <= 0) {
// 队列不为空,但是timeout等待时间超时了,返回null(一般不是第一次循环)
return null;
}
// first设置为null,便于await期间提早gc这个临时变量
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null) {
// poll指定的等待时间小于队头元素delay的时间,或者leader不为空(之前已经有别的线程在等待了捞取任务了)
// 最多等待到timeout
nanos = available.awaitNanos(nanos);
} else {
// 队头元素delay的时间早于waitTime指定的时间,且此前leader为null
// 当前线程成为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待delay时间
long timeLeft = available.awaitNanos(delay);
// 醒来后,nanos自减
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread) {
leader = null;
}
}
}
}
}
} finally {
if (leader == null && queue[0] != null) {
// leader为空,且队列不为空(比如leader线程被唤醒后,通过finishPoll已经获得了之前的队列头元素)
// 尝试唤醒之前阻塞等待的那些消费者线程
available.signal();
}
lock.unlock();
}
}
@Override
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (int i = 0; i < size; i++) {
RunnableScheduledFuture<?> t = queue[i];
if (t != null) {
// 将队列内部数组的值全部设置为null
queue[i] = null;
// 所有任务对象的index都设置为-1
setIndex(t, -1);
}
}
size = 0;
} finally {
lock.unlock();
}
}
/**
* Returns first element only if it is expired.
* Used only by drainTo. Call only when holding lock.
*/
private RunnableScheduledFuture<?> peekExpired() {
RunnableScheduledFuture<?> first = queue[0];
// 如果队头元素存在,且已到期(expired) 即delay <= 0,返回队头元素,否则返回null
return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : first;
}
@Override
public int drainTo(Collection<? super Runnable> c) {
if (c == null) {
throw new NullPointerException();
}
if (c == this) {
throw new IllegalArgumentException();
}
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first;
int n = 0;
// 延迟队列的drainTo只返回已过期的所有元素
while ((first = peekExpired()) != null) {
// 已过期的元素加入参数指定的集合
c.add(first); // In this order, in case add() throws.
// 同时将其从队列中移除
finishPoll(first);
// 总共迁移元素的个数自增
++n;
}
// 队列为空,或者队列头元素未过期则跳出循环
// 返回总共迁移元素的个数
return n;
} finally {
lock.unlock();
}
}
@Override
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列底层本来就是数组,直接copy一份即可
return Arrays.copyOf(queue, size, Object[].class);
} finally {
lock.unlock();
}
}
}
3.对于周期性的任务ScheduledThreadPoolExecutor是如何进行调度的?
ScheduledThreadPoolExecutor允许用户提供三种不同类型的任务:
- 只需要调度一次的一次性延迟任务(通过schedule方法提交, 创建的任务对象period=0)
- 需要周期性调度的固定频率的任务(通过scheduleAtFixedRate方法提交,创建的任务对象period>0)
- 需要周期性调度的固定延迟的任务(通过scheduleWithFixedDelay方法提交,创建的任务对象period<0)
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null) {
throw new NullPointerException();
}
// 装饰任务对象
RunnableScheduledFuture<?> t = decorateTask(command,
new MyScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
// 提交任务到工作队列中,以令工作线程满足条件时将其取出来调度执行
delayedExecute(t);
return t;
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (command == null || unit == null) {
throw new NullPointerException();
}
if (period <= 0) {
throw new IllegalArgumentException();
}
// 固定周期重复执行的任务,period参数为正数
MyScheduledFutureTask<Void> scheduledFutureTask = new MyScheduledFutureTask<>(
command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
// 装饰任务对象
RunnableScheduledFuture<Void> t = decorateTask(command, scheduledFutureTask);
// 记录用户实际提交的任务对象
scheduledFutureTask.outerTask = t;
// 提交任务到工作队列中,以令工作线程满足条件时将其取出来调度执行
delayedExecute(t);
return t;
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 固定延迟重复执行的任务,period参数为负数
MyScheduledFutureTask<Void> scheduledFutureTask =
new MyScheduledFutureTask<>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));
// 装饰任务对象
RunnableScheduledFuture<Void> t = decorateTask(command, scheduledFutureTask);
// 记录用户实际提交的任务对象
scheduledFutureTask.outerTask = t;
// 提交任务到工作队列中,以令工作线程满足条件时将其取出来调度执行
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown()) {
// 线程池已经终止了,执行reject拒绝策略
super.reject(task);
} else {
// 没有终止,任务在工作队列中入队
super.getQueue().add(task);
// 再次检查状态,如果线程池已经终止则回滚(将任务对象从工作队列中remove掉,并且当前任务Future执行cancel方法取消掉)
// 在提交任务与线程池终止并发时,推进线程池尽早到达终结态
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) {
task.cancel(false);
} else {
// 确保至少有一个工作线程会处理当前提交的任务
ensurePrestart();
}
}
}
第一种一次性的延迟任务的调度在前面的章节已经说的比较清楚了,就是简单的将任务加入定制的工作队列,等待线程池中的工作线程在任务调度时间达到要求时令任务出队并调度执行即可。
下面我们看看需要反复被调度的周期性任务是如何被调度的,重点关注之前没有展示的ScheduledFutureTask任务对象的run方法。
- 可以看到在任务对象的run方法中,如果是period为0的一次性任务,只需要简单的调用run方法即可。
- 而对于period不为0的周期性任务,首先需要通过runAndReset方法执行当前的调度操作,在操作结束之后通过setNextRunTime方法计算并设置当前周期性任务下一次要被调度的时间,
然后通过reExecutePeriodic方法将任务重新加入工作队列中,这样就能在对应的时间再被工作线程消费到并且调度执行了。如此循环往复则可以一直将周期性的调度任务运行下去。
任务对象调度相关逻辑
/**
* 调度任务对象
* */
private class MyScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>{
/**
* 用于保证相同延迟时间的任务,其FIFO(先提交的先执行)的特性
* */
private final long sequenceNumber;
/**
* 当前任务下一次执行的时间(绝对时间,单位纳秒nanos)
* */
private long time;
/**
* 需要重复执行的任务使用的属性
* 1 period>0,说明该任务是一个固定周期重复执行的任务(通过scheduleAtFixedRate方法提交)
* 2 period<0,说明该任务是一个固定延迟重复执行的任务(通过scheduleWithFixedDelay方法提交)
* 3 period=0,说明该任务是一个一次性执行的任务(通过schedule方法提交)
* */
private final long period;
/**
* 定期任务实际执行的具体任务
* */
RunnableScheduledFuture<V> outerTask = this;
/**
* 基于二叉堆的延迟队列中的数组下标,用于快速的查找、定位
* */
int heapIndex;
/**
* 一次性任务的构造函数(one action)
* */
MyScheduledFutureTask(Runnable runnable, V result, long ns) {
super(runnable, result);
// 下一次执行的时间
this.time = ns;
// 非周期性任务,period设置为0
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 周期性任务的构造函数
*/
MyScheduledFutureTask(Runnable runnable, V result, long ns, long period) {
super(runnable, result);
// 下一次执行的时间
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 设置下一次执行的事件
* */
private void setNextRunTime() {
long p = this.period;
if (p > 0) {
// fixedRate周期性任务,time基础上单纯的加period就能获得下一次执行的时间
// (不用考虑溢出,因为如果因为time太大而溢出了(long类型溢出说明下一次执行时间是天荒地老),则永远不会被执行也是合理的)
this.time += p;
} else {
// fixedDelay周期性任务,下一次时间为当前时间+period(当前调度已经执行完成,fixedDelay任务第n+1的执行时间是第n次执行完成后+period)
// 下一次调度的时间(需要处理溢出)
this.time = triggerTime(-p);
}
}
@Override
public void run() {
boolean periodic = isPeriodic();
// 根据当前线程池状态,判断当前任务是否应该取消(比如已经是STOP了,就应该停止继续运行了)
if (!canRunInCurrentRunState(periodic)) {
// 不能正常运行,取消掉
cancel(false);
} else if (!periodic) {
// 非周期性任务,当做普通的任务直接run就行了
MyScheduledFutureTask.super.run();
} else if (MyScheduledFutureTask.super.runAndReset()) {
// 注意:runAndReset如果抛异常了,则不会走reExecutePeriodic逻辑重新加入工作队列,导致这个周期性的任务就不会再被执行了
// If any execution of the task encounters an exception, subsequent executions are suppressed
// 设置下一次执行的事件
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}
/**
* 尝试重新提交并执行周期性任务(是属于ScheduledThreadPoolExecutor的方法)
* */
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
// 当前线程池状态允许执行任务,将任务加入到工作队列中去
super.getQueue().add(task);
// 再次检查,如果状态发生了变化,不允许了,则通过remove方法将刚加入的任务移除掉,实现回滚
// 和ThreadPoolExecutor一致都是为了让shutdown/stop状态的线程池尽量在状态变更和提交新任务出现并发时,不要去执行新任务尽早终止线程池
if (!canRunInCurrentRunState(true) && super.remove(task)) {
task.cancel(false);
} else {
// 确保至少有一个工作线程会处理当前提交的任务
super.ensurePrestart();
}
}
}
/**
* Returns current nanosecond time.
*/
final long now() {
return System.nanoTime();
}
/**
* 获得下一次调度的绝对时间
* */
private long triggerTime(long delay, TimeUnit unit) {
// 统一转成nanos级别计算
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
* 获得下一次调度的绝对时间
* @param delay 延迟时间(单位nanos)
*/
long triggerTime(long delay) {
if(delay < (Long.MAX_VALUE >> 1)){
// delay小于Long.MAX_VALUE/2,肯定不会发生compareTo时的溢出问题,直接正常累加delay即可
return now() + delay;
}else{
// delay大于Long.MAX_VALUE/2,可能会发生compareTo时的溢出问题,在overflowFree检查并做必要的修正
return now() + overflowFree(delay);
}
}
固定频率/固定延迟的周期性任务到底有什么不同?
固定频率和固定延迟的周期性任务最大的区别就在于setNextRunTime方法中对于下一次调度时间计算的方式不同
- 提交任务时的period周期参数指定了每次调度的间隔时间,但在不同场景下的语义不同
- fixedRate周期性任务,time基础上单纯的加period就能获得下一次执行的时间
- fixedDelay周期性任务,下一次时间为当前时间+period(当前调度已经执行完成,fixedDelay任务第n+1的执行时间是第n次执行完成后+period)
周期性任务执行时出现异常会抑制后续的调度
- 需要特别注意的是,当周期性任务的run方法中的ScheduledFutureTask.super.runAndReset()出现异常时,run方法会直接抛出异常而退出,并不会执行后续的reExecutePeriodic重新入队操作,导致无声无息的中止了后续的周期性调度流程。
If any execution of the task encounters an exception, subsequent executions are suppressed -
因此用户自己的业务逻辑中只有在需要中断后续调度时才应该抛出异常,否则将会出现意想不到的问题。
我曾经就踩过坑,使用ScheduledThreadPoolExecutor周期性的向DB注册心跳时忘记catch异常,导致网络波动使得DB访问异常时节点的心跳续期也断了
ScheduledThreadPoolExecutor的缺点
ScheduledThreadPoolExecutor作为一个单机纯内存的延时/定时任务调度框架能够很好的应对日常开发中出现的多数需求,但其还是存在着一些缺陷。
- 从功能上来说,纯内存的设计使得调度任务没有持久化的功能,在服务宕机等极端情况下会丢失掉已经提交的任务;同时也缺乏集群内跨机器节点的分布式负载均衡的能力。
- 从性能上来说,ScheduledThreadPoolExecutor是基于完全二叉堆的,能以O(log n)对数的时间复杂度进行任务的入队/出队,且使用了一个全局的互斥锁来防止并发,因此其高并发场景下的吞吐量并不高。
而时间轮则能够在牺牲一定调度精度的前提下,将调度任务的入队/出队的时间复杂度降低至常数复杂度。在每秒有成百上千的任务被频繁提交/调度执行的场景下,时间轮的表现要远远优于ScheduledThreadPoolExecutor。
因此时间轮在kafka、netty等常见的中间件、框架中都使用时间轮而不是jdk的ScheduledThreadPoolExecutor来实现任务调度。
总结
- 本篇博客从源码的角度详细分析了jdk调度线程池ScheduledThreadPoolExecutor的工作原理。其中重点介绍了其是如何基于ThreadPoolExecutor实现多线程任务调度的,并从源码的角度分析了其存储任务、调度任务的一些细节(博客中只分析了比较核心的逻辑,有些旁路逻辑被省略了)。
- 在博客中还引入了同样用于任务调度的时间轮算法与之进行简单的比较(关于时间轮算法工作原理解析的博客会在后续发布)。
- 本篇博客的完整代码在我的github上:https://github.com/1399852153/Reinventing-the-wheel-for-learning (ThreadPool模块 MyScheduledThreadPoolExecutor类) 内容如有错误,还请多多指教。