JAVA并发编程之——线程池

时间:2022-02-25 18:01:03

Executor接口

public interface Executor {

void execute(Runnable command);
}

Executor提供了操作Runnable的接口,我们可以直接执行Runnable的方法,也可以创建Thread来执行Runnable,也可以根据实际情况把Runnable放入队列中按顺序执行。

ExecutorService接口

public interface ExecutorService extends Executor {

void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

//提交有返回结果的任务,返回一个Future对象
<T> Future<T> submit(Callable<T> task);
//执行成功返回给定的结果
<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

//执行系列任务 并返回Future列表
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

//执行系列任务 设置超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

//执行系列任务 返回任意成功执行任务的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService抽象类

AbstractExecutorService实现了ExecutorService的submit(),invokeAny()、invokeAll()方法。

public abstract class AbstractExecutorService implements ExecutorService {

//创建FutureTask对象
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/

public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

/**
* the main mechanics of invokeAny.
*/

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);

// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.

try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();

// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;

for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}

if (ee == null)
ee = new ExecutionException();
throw ee;

} finally {
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));

final long deadline = System.nanoTime() + nanos;
final int size = futures.size();

// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}

for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

}

ThreadPoolExecutor实现类

线程池的状态、数量

    // 线程池的状态由ctl是一个由workCount、runState两个字段包装后的AtomicInteger值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;//Integer.SIZE=32
////线程池最大线程数=536870911(2^29-1),运算是左移29 位
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits(32位中的高3位表示)
//runState控制生命周期
//RUNNING: 接受任务并执行队列中的任务
//SHUTDOWN:不接受任务但执行队列中的任务
//STOP:不接受任务也不执行队列中的任务并中断执行中的任务
//TIDYING:所有的任务被中止,workerCount是0,线程执行terminated()方法转换状态
//TERMINATED:terminated()执行完成
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/


private static boolean runStateLessThan(int c, int s) {
return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

构造函数


/**
*线程池所使用的缓冲队列
*/

private final BlockingQueue<Runnable> workQueue;

/**
* 线程工厂
*/

private volatile ThreadFactory threadFactory;

/**
* 任务拒绝策略
* CallerRunsPolicy:执行excute的线程直接执行该任务
* AbortPolicy:拒绝任务并抛出RejectedExecutionException异常
* DiscardPolicy:处理程序拒绝任务,默默地丢弃了任务。
* DiscardOldestPolicy:抛弃队列第一个为执行的任务,提交该任务
*/

private volatile RejectedExecutionHandler handler;

/**
* 空闲工作线程的空闲时间,当线程数量大于corePoolSize时回收线程
*
*/

private volatile long keepAliveTime;

/**
* 默认false
* 如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0
*/

private volatile boolean allowCoreThreadTimeOut;

/**
* 线程池维护线程的最少数量
*/

private volatile int corePoolSize;

/**
* 线程池维护线程的最大数量
*/

private volatile int maximumPoolSize;

/**
* 默认AbortPolicy策略
*/

private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

/**
* 给定线程工厂和拒绝处理器
*/

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

/**
*给定默认的拒绝处理器
*/

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}

/**
* 给定默认的线程创建工厂
*/

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}


/**
* 根据给定的初始化参数创建一个ThreadPoolExecutor对象
*
* @param corePoolSize 核心线程池线程数量,即使线程池中存在空闲的线程也
* 会创建新的线程来保持, 除非设置allowCoreThreadTimeOut
* @param maximumPoolSize 线程池中允许的最大数量
* @param keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间
* @param unit keepAliveTime的时间单位
* @param workQueue 阻塞任务队列,队列只持有提交的Runnable任务
* @param threadFactory executor创建线程的工厂
* @param handler 当线程和队列达到上线时任务线程的处理器
*/

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

任务的创建

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

/**
* 任务会在新的线程或者已存在的线程执行
*
*如果任务未提交或者未执行,是线程池关闭或容量达到上限
*/

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1、如果线程的数量小于corePoolSize的数量,
* 新建一个新的线程并将该任务当作它的第一个任务
* 调用addWorker之前检查runState和workerCount
* 2、任务成功加入队列,仍需要检查避免线程池在添加线程的过程中关闭
*
* 3、如果队列已满,我们创建新的线程,如果创建线程失败拒绝任务
*/

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//当前线程数大于核心线程数或者addWroker失败,需要把任务提交到任务队列,等待Worker线程空闲后处理
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//检查线程池是否关闭,若关闭并且线程没有被执行则删除这个任务
if (! isRunning(recheck) && remove(command))
reject(command);
//如果当前线程池数量为0则创建新线程(刚加入队列的任务也应该执行完成)。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//线程数量大于corePoolSize并且队列已满
else if (!addWorker(command, false))
//如是提交任务失败则调用reject处理失败任务
reject(command);
}



/*
*
* @param firstTask
*
* @param
*/

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
//线程数量达到上限
//core true表示添加任务时线程数量小于corePoolSize,并使用corePoolSize来检查线程池状态;
// false则使用maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas修改增加线程池所拥有的线程数(CAS不懂)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//用firstTask创建Worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//workers其实是线程存储
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动任务线程
t.start();
workerStarted = true;
}
}
} finally {
//任务没有被启动,从workers中删除,类似回滚操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

任务的创建总结:

  • 当前线程池中的数量小于corePoolSize,创建并添加的任务。

  • 当前线程池中的数量大于等于corePoolSize,缓冲队列workQueue未满,那么任务被放入缓冲队列、等待任务调度执行。

  • 当前线程池中的数量大于等corePoolSize,缓冲队列workQueue已满,并且线程池中的数量小于maximumPoolSize,新提交任务会创建新线程执行任务。

  • 如果当前线程池中的数量大于等corePoolSize,缓冲队列workQueue已满,并且线程池中的数量等于maximumPoolSize,新提交任务由Handler处理。

不管什么条件,添加任务的过程总是不停的检查来确保线程池是正常状态,否则不会添加任务。

任务执行

想要看线程是如何运行的就必须看一下Worker的构造函数


private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

final Thread thread;

Runnable firstTask;
/** 记录该线程完成了多少个任务(也就是每一个线程) */
volatile long completedTasks;

/**
* 创建一个Thread时候把this传入,
* 这样的话如果我调用Worker.thread.start()就相当于该线程会执行Worker里的run方法了
*/

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}

下面详细分析一下runWorker

final void runWorker(Worker w) {
// 当前线程为什么不是从Worker中取出来?
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
//用于判断线程是否由于异常终止,如果不是异常终止,在后面将会将该变量的值改为false
boolean completedAbruptly = true;
try {
//如果存在firstTask则直接执行该任务,否则从任务队列里阻塞获取任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
//线程池状态已被标为停止 或 线程中断 的情况下,如果当前线程没有被中断则中断当前线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//预留方法,子类继承实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// //预留方法,子类继承实现
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

下面看看getTask是如何从堵塞队列取出任务的

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 线程池关闭,队列中的任务仍需要执行
//线程池停止,队列中的任务不再执行
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//线程池中的线程超过设置的corePoolSize参数,
//如果等待keepAliveTime时间后仍然没有新的任务分配给它,那么这个线程将会被回收
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

Executors 线程池的配置

Executors在这里可以看做是一个工厂,生成不同的线程池,根据参数的不同生成不同的ThreadPoolExecutor:

创建一个线程数目固定的线程池,配置的corePoolSize与maximumPoolSize大小相同,同时使用了一个*LinkedBlockingQueue存放阻塞任务,因此多余的任务将存在再阻塞队列,不会由RejectedExecutionHandler处理

//指定默认的线程工厂
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

创建一个只支持一个线程的线程池,配置corePoolSize=maximumPoolSize=1,*阻塞队列LinkedBlockingQueue;保证任务由一个线程串行执行

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

创建一个缓冲功能的线程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一个无容量的阻塞队列 SynchronousQueue,因此任务提交之后,将会创建新的线程执行;线程空闲超过60s将会销毁

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

创建一个有定时功能的线程池,配置corePoolSize,*延迟阻塞队列DelayedWorkQueue;

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

到这里,线程池的使用我们已经简单的了解了,ScheduledThreadPoolExecutor定时线程池与创建不同线程池时使用的Queue在别的文章中分析。