Java并发编程(九)《Executor框架》
@(并发)
9.1 Executor框架简介
9.1.1 Executor框架的两级调度模型
9.1.1.1 两级调度模型图
9.1.1.2 解释
- 在上层,java多线程程序将应用分解成若干的任务提·交给用户调度器(Executor框架),然后Executor框架将这些任务映射到固定数量的线程处理。
- 在下层,os将这些固定数量的线程映射到cpu上。
9.1.2 Executor框架的成员
9.1.2.1 任务接口(Runnable或者Callable)
Runnable或者Callable接口的实现类都可以交给ThreadPoolExecutor和ScheduledThreadPoolExecuotr线程池处理:
void execute(Runnable command);
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
9.1.2.2 任务的执行(线程池)
提交的任务由线程池执行,线程池分为ThreadPoolExecutor(立即执行任务的线程池)和ScheduledThreadPoolExecuotr(延时或者定时执行任务的线程池), 线程池的行为由Executor以及继承Execuotr的ExecutorService接口定义.
9.1.2.3 异步计算的结果返回(Future和FutureTask)
当submit提交任务给线程池时,会立即返回Future的对象,如果主线程想要获取任务异步计算完成的结果,可以Future.get(),当任务没有执行或者执行中时,主线程会阻塞在Future.get()方法上,当任务异步执行完毕后主线程会被唤醒立即返回。
9.1.3 Executor框架的类与接口UML
![Alt text](./Executor框架类UML 1.png)
9.1.4 Executor框架的的使用
![Alt text](./Executor框架使用示意图 1.png)
要结果返回的向线程池提交任务的过程:
1.将Runnable 任务包装成FutureTask类型的对象,提交给线程池执行,然后返回FutureTask对象
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
2.通过调用Future.get()方法等待任务异步执行完毕,主线程也可以通过Future.cancel取消任务。
Future<?> future = pool.submit(new Runnable() {
public void run() {
System.out.println("poll submit");
}
});
//当提交的任务还没有执行或者执行中时,主线程会在get方法上阻塞等待
if(future.get()!=null){
System.out.println("end");
}
9.2 核心线程池ThreadPoolExcutor
9.2.1 固定线程数的FixedThreadPool
Executors.newFixedThreadPool(10);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
9.2.3 单个线程执行的SingleThreadPool
Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
9.2.2 适合执行短期异步任务的CachedThreadPool
Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
9.3 定时调度线程池ScheduledThreadPoolExecutor
9.3.1 简介
- ScheduledThreadPoolExecutor继承ThreadPoolExecutor,是对一般线程池的扩展,实现了延时运行任务或者定时执行某个任务的功能。
- 功能和Timer相似,但它更能更加灵活强大。Timer只是单个线程执行,而ScheduledThreadPoolExecutor可以是多个线程并发执行。
- 它使用DelayQueue*延时队列作为任务队列。
9.3.2 延时和周期性任务
ScheduledThreadPoolExecutor通过schedule()、scheduleAtFixedRate(),scheduleWithFixedDelay()提交一次性延时和周期性任务:
public class ScheduledThreadPoolTest {
public static void main(String[] args) {
//有10个核心线程的定时调度线程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
/** * 一次性调度任务:延时1s执行任务 */
scheduledExecutorService.schedule(new ScheduledTask(), 1, TimeUnit.SECONDS);
/** * 周期性调度任务: * 初始化延时1s执行任务, * 周期为1s:上一次任务执行开始时间到下一次任务开始执行间隔1s(下一个任务不要等前一个任务执行完) */
scheduledExecutorService.scheduleAtFixedRate(new ScheduledTask(), 1, 1, TimeUnit.SECONDS);
/** * 周期性调度任务: * 初始化延时1s执行任务, * 周期为1s:上一次任务执行结束时间到下一次任务开始执行间隔1s(下一个任务要等前一个任务执行完) */
scheduledExecutorService.scheduleWithFixedDelay(new ScheduledTask(), 1, 1, TimeUnit.SECONDS);
}
static class ScheduledTask implements Runnable {
public void run() {
System.out.println("scheduled task");
}
}
}
9.3.3 运行机制
- 通过调用scheduleAtFixedRate()或者scheduleWithFixedDelay()方法,提交一个ScheduledFutureTask任务给线程池:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//组装ScheduledFutureTask任务对象
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Object>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period)));
//将ScheduledFutureTask任务对象交给线程池执行
delayedExecute(t);
return t;
}
ScheduledFutureTask是ScheduledThreadPool的内部类,是一个具有延时异步特性的任务类,任务接口实现了Delay接口,需要重写getDelay()和compareTo()方法 :
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/** * 每个任务延时的时间,当时间没有到达到time时,此任务是不会被取出的 * 同时将任务添加到Delay队列中时,队列会根据time排序,最早的time时间排在最前面,优先取出 */
private long time;
/** Sequence number to break ties FIFO */
/** * 每个任务被创建时唯一的序列号,由线程池上静态变量AtomicLong sequencer线程安全的有序创建。 * 保证任务创建时的有序性FIFO,同时当阻塞队列中两个任务延时时间time一样时,可以根据谁先创建的sequenceNumber序列号排序,小的先被创建先执行 */
private final long sequenceNumber;
/** * 任务重复执行的周期性时间 * 正值:fixed-rate execution代表从上个任务开始时间间隔period,周期性重复执行 * 负值:ixed-delay execution代表从上个任务结束时间间隔-period,周期性重复执行 * 0:一次性不重复执行 */
private final long period;
/** * 获取任务的延时到期的剩余时间,单位纳秒: * 当getDelay()==0时,代表任务到期了,需要立即执行 */
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
/** * 任务在延时队列中比较排序的依据: * 延时时间小的排在队列前面,优先被取出 * 当延时时间相同时,看任务的唯一自增序列,先创建的排在前面 */
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0)? 0 : ((d < 0)? -1 : 1);
}
}
- 当线程池此时的线程数<核心线程数时,创建Worker线程工作;否则直接添加到延时任务队列DelayQueue中,DelayQueue由优先级队列PriorityQueue组成,所以会把提交的任务根据延时时间排序,延时先创建的任务排在最前面:
private void delayedExecute(Runnable command) {
if (isShutdown()) {
reject(command);
return;
}
// 当线程池此时的线程数<核心线程数时,创建Worker线程工作
if (getPoolSize() < getCorePoolSize())
prestartCoreThread();
//否则直接添加到延时任务队列DelayQueue中
super.getQueue().add(command);
}
- 线程池里的核心线程并发循环take()延时队列,获取最先延时到期的任务(队头),若队头任务还没到期,则Worker线程take()阻塞等待。到期后,Worker线程运行ScheduledFutureTask.run():
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
// ScheduledFutureTask实现了Runable接口,重写了run方法
public void run() {
if (isPeriodic())
runPeriodic();
else
ScheduledFutureTask.super.run();
}
}
- Worker线程执行完当前任务后,若执行的是周期性的任务,则计算下次任务开始时间time,重新加入到任务队列中:
//判断是否为周期性任务,若是执行runPeriodic
public boolean isPeriodic() {
return period != 0;
}
//周期性执行任务实现
private void runPeriodic() {
//1.先执行当前任务
boolean ok = ScheduledFutureTask.super.runAndReset();
boolean down = isShutdown();
// Reschedule if not cancelled and not shutdown or policy allows
//2.执行完当前任务后,若任务没被取消,线程池没有关闭,则计算下一次重复任务的运行时间。
if (ok && (!down ||
(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
!isStopped()))) {
long p = period;
if (p > 0)
//当调用scheduleAtFixedRate提交任务时,表示下次任务执行的时间=这次任务的开始时间+period
time += p;
else
//当调用scheduleWithFixedDelay提交任务时,表示下次任务执行的时间=这次任务的结束时间now()+period
time = triggerTime(-p);
//3.修改了当前任务下一次执行的时间,重新提交给任务队列中,以便周期性执行
ScheduledThreadPoolExecutor.super.getQueue().add(this);
}
else if (down)
interruptIdleWorkers();
}