Java多线程 -- JUC包源码分析13 -- Callable/FutureTask源码分析

时间:2022-01-30 07:31:01

关于Runnable,我们都已经很熟悉了。在上一篇,我们也分析了ThreadPoolExecutor用来执行任务的接口execute,如下所示:

public interface Runnable {
public abstract void run();
}

public void execute(Runnable command) {
。。。
}

这里的execute(..)是没有返回值的。如果我们希望主线程把任务扔给线程池执行完毕之后,能直接获取任务执行结果,那么就需要一个“有返回值的Runnable”,也就是本文要讲的Callable。


Callable使用方式

Callable接口定义如下:

//和Runnable相比,Callable主要就是多了一个返回值。
public interface Callable<V> {
V call() throws Exception;
}

使用方式如下:

Callable<String> c = new XXXCallable<String>();   //自定义Callable

Future<String> f = executor.submit(c); //把Callable交给线程池执行,返回一个票据

String result =- f.get(); //通过这个“票据”取回结构。如果任务没有计算完,调用者一直阻塞在这里

在JDK 1.6的线程池框架中,Callable的执行,用的是submit接口。但这个submit接口并不是ThreadPoolExecutor的一个函数,而是其父类AbstractExecutorService的一个函数,源码如下:

public abstract class AbstractExecutorService implements ExecutorService {
。。。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask); //关键点:模板方法,被子类ThreadPoolExecutor重写
return ftask;
}

。。。
}

从上面可以看出,Callable其实是用Runnable实现的。在submit内部,把Callable通过FutureTask这个Adapter,转化成了Runnable,然后通过execute执行。

相关的类的关系图如下:

Callable/Runnable关系图

Java多线程 -- JUC包源码分析13 -- Callable/FutureTask源码分析

从这个图中,可以看到Callable、FutureTask、Runnable3者之间的关系:
(1)FutureTask就是个Adapter,在submit内部,把Callable转换成Runnable,然后通过execute执行。execute执行的run方法,也就是FutureTask的run方法,这个稍候来分析。
(2)FutureTask就相当于一个“票据”,把任务塞给了execute执行,把这个票据返回给调用者。调用者将来可以用这个“票据”的get方法,把结果取回去。

public interface Future<V> {
。。。
V get() throws InterruptedException, ExecutionException;


V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
。。。
}

那如何实现future.get()阻塞主线程,直到结果返回呢?答案就在FutureTask的2个关键函数里面。

FutureTask内部实现

public class FutureTask<V> implements RunnableFuture<V> {

//实现的Runnable接口,被executor.execute执行
public void run() {
sync.innerRun();
}

//取回结果
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}

private final Sync sync;
...
}

通过上述2个函数的对照,大致也可以猜测出实现的思路:

在run里面,先加锁,然后执行任务,执行完再释放锁;这样调用者调用get()的时候,拿不到锁,就会阻塞,直到任务执行完毕,释放锁。

下面就看一下其具体代码:

//FutureTask的内部类Sync,和前面分析过的Semaphore/CountDownLatch类似
private final class Sync extends AbstractQueuedSynchronizer {

private static final int RUNNING = 1; //执行任务之前,置成Running
private static final int RAN = 2; //执行任务结束,置成Ran
private static final int CANCELLED = 4; //被取消,置成Cancelled

void innerRun() {
if (!compareAndSetState(0, RUNNING)) //任务开始之后,把其置为Running状态,也即意味着,对AQS加锁
return;
try {
runner = Thread.currentThread();
if (getState() == RUNNING)
innerSet(callable.call()); //调用call执行任务,同时在innerSet里面,设置返回值,并释放AQS的锁
else
releaseShared(0); //如果任务被取消,直接释放锁
} catch (Throwable ex) {
innerSetException(ex);
}
}

V innerGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0); //刚进去,拿不到锁,阻塞在这里。等上面的run函数执行完,把锁释放
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}

protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1; //
}
。。。
}

总结: 理解了前几篇的AQS原理,再结合Callable/Runnable的关系,就很容易理解Callable/Future的实现原理了。