Java多线程 -- JUC包源码分析14 -- ScheduledThreadPoolExecutor与DelayQueue源码分析

时间:2022-09-21 10:45:07

在前面的篇章中,我们分析了ThreadPoolExecutor,知道了execute和submit的内部实现原理,知道了Runnable/Callable的内在关系。

周期/非周期 AtFixedRate/WithFixedDelay

而ScheduledThreadPoolExecutor,正像其名字所反映的,实现了时间调度相关的功能,具体说来,有2个方面:
(1) 非周期性的

 //把一个Runnable/Callable 延迟到未来执行,延迟时间是delay,单位是TimeUnit
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit);

(2) 周期性的

//把一个Runable延迟initialDelay执行,同时以后周期性执行。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) ;

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) ;

那上面2个函数使用上面有什么区别呢?

AtFixedRate:顾名思义,按固定频率执行,与任务本身执行时间没有关系。(但这有个前提条件:任务执行时间必须小于间隔时间,比如间隔时间是5s,每5s执行一次任务,那任务的执行时间,必须小于5s。关于这1点,后面源码会详细分析)

WithFixedDelay: 按固定间隔执行,与任务本身执行时间有关。比如任务本身执行时间是10s,间隔2s,那下1次开始执行时间就是12s。

知道了ScheduledThreadPoolExecutor的用法,下面分析其源码实现

DelayQueue – Scheduled的基石

我们的分析,首先从其构造函数开始。看构造函数,我们会发现,其使用了一个我们之前没用到过的队列,即DelayedWorkQueue。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}

看其源码,我们会发现,它同样实现了BlockingQueue接口。同时,它内部包装了一个DelayedQueue,其所有方法,都是代理给DelayedQueue来实现的。

//ScheduledThreadPoolExecutor的内部类
private static class DelayedWorkQueue
extends AbstractCollection<Runnable>
implements BlockingQueue<Runnable> {


private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
public Runnable poll() { return dq.poll(); }
public Runnable peek() { return dq.peek(); }
public Runnable take() throws InterruptedException { return dq.take(); }
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
return dq.poll(timeout, unit);
}

那这个DelayQueue,又是何方神圣呢?

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {


private transient final ReentrantLock lock = new ReentrantLock();
private transient final Condition available = lock.newCondition();
private final PriorityQueue<E> q = new PriorityQueue<E>();

...

看到这,我们知道了,DelayQueue其实就是一个线程安全的PriorityQueue(其内部数据结构就是2叉堆),跟我们前面所讲的PriorityBlockingQueue原理几乎一样。

既然是PriorityQueue,其放进去的元素,就需要比较大小,因为就必须实现Comparable接口。

那比较的什么东西呢?就是delay的时间,所以称之为DelayQueue。

public interface Delayed extends Comparable<Delayed> {

long getDelay(TimeUnit unit);
}

所以,所有放进DelayQueue的元素,都要实现getDelay函数,实现Comparable接口。

DelayQueue的一个重要特性

DelayQueue有一个重要特性:往外take元素的时候,如果时间还未到,也就是delay > 0,调用者会阻塞,直到直接到了,元素才拿的出去。

    public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await(); //队列为空,阻塞
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay); //任务的时间未到,阻塞
} else {
E x = q.poll(); //时间到了,任务拿出来
assert x != null;
if (q.size() != 0)
available.signalAll(); //唤醒其他的消费者
return x;

}
}
}
} finally {
lock.unlock();
}
}

schedule内部实现

schedule

有了DelayQueue这个大杀器,schedule就很容易实现了。如下:

    public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));

delayedExecute(t); //不是execute,自己实现的delayedExecute
return t;
}

private void delayedExecute(Runnable command) {
if (isShutdown()) {
reject(command);
return;
}

if (getPoolSize() < getCorePoolSize())
prestartCoreThread(); //线程数 < corePoolSize,就把线程数开到等于corePoolSize

super.getQueue().add(command); //新进来的请求,直接扔到DelayQueue里面
}

ScheduledFutureTask

上面的schedule过程很简单,直接把任务扔进队列里就可以了。下面复杂一点的,是拿出来执行的过程,也就是ScheduledFutureTask。跟上一篇讲的FutureTask类似,它也要实现run函数:

        public void run() {
if (isPeriodic())
runPeriodic(); //如果是周期性的,调用runPeriodic
else
ScheduledFutureTask.super.run(); //不是周期性任务,直接执行。因为从DelayQueue拿出来的时候,就正好是执行时间
}
        private void runPeriodic() {
boolean ok = ScheduledFutureTask.super.runAndReset(); //先执行任务,并且复原AQS
boolean down = isShutdown();

if (ok && (!down ||
(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
!isStopped()))) {
long p = period;
if (p > 0) //p > 0 代表AtFixedRate
time += p; //从上1次任务开始时间算起,时间加上p
else // p < 0 代表WithFixedDelay
time = triggerTime(-p); //从当前时间算起,时间加上-p

ScheduledThreadPoolExecutor.super.getQueue().add(this); //关键点:任务执行完了,把任务的时间延期后,再次放入队列。从而实现了周期性。
//另外,因为是任务执行完了,才放进去的。因此任务的执行时间必须小于间隔时间,也就是前面所讲的AtFixedRate
}
else if (down)
interruptIdleWorkers();
}

下面看一下其实现的Comparable接口:

        public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}

public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber) //关键点:当两个任务的delay时间相等时,比较序列号,从而保证任务按顺序执行。序列号是在构造函数里面创建的,原子自增。
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0)? 0 : ((d < 0)? -1 : 1);
}

scheduleAtFixedRate/scheduleWithFixedDelay

//2者的差别很小,一个传进去的正的period,一个传进去的负的delay。这也印证了上面所分析的runPeriodic
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Object>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period)));
delayedExecute(t);
return t;
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Boolean>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay)));
delayedExecute(t);
return t;
}

总结

(1)ScheduledThreadPoolExecutor的核心实现依靠DelayQueue,其内部是一个PriorityQueue

(2)周期性的实现原理:执行完一个Task之后,把Task的时间 加上一个周期,重新扔回到队列里面。

(3)AtFixedRate和WithFixedDelay的区别,就是计算任务的下1次开始时间不一样:一个是从上一次开始时间 + 一个周期,一个是从上一次结束时间(即当前时间) + 一个周期。