Java Thread&Concurrency(7): 深入理解Callable/Future(FutureTask)接口及其实现

时间:2021-01-25 07:01:26

背景(注释):

使用Callable可以定义一个返回结果的任务(过程中可能抛出异常)。

实现它只需要返回一个结果,不提供参数。

这个Callabel接口类似于Runnable,两者都是被设计用于被其他线程执行。区别是Runnable无法返回结果以及不能抛出受检查异常。

Executors中包含一共公共方法用于转化其他的实例为Callable实例。

public interface Callable<V> {
V call() throws Exception;
}

通过Future可以得到异步计算的结果。这个接口支持检查任务是否执行完成,等待任务完成,以及获取计算完成之后的结果。计算结果只能通过get方法来获取,当任务未完成时会阻塞。通过cancel方法实现取消机制。附加的方法被提供能够用于确定任务已经正式完成或者被取消。当一个计算任务完成,任务的状态就不能变为取消。如果你使用Future的目的仅仅只是为了得到取消的能力,那么你可以声明Future<?>以及通过返回null作为一个结果。

public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}


使用场景:

Future通过与ExecutorService、Callable配合使用,从而得到计算结果。

interface ArchiveSearcher { String search(String target); }
  class App {
    ExecutorService executor = ...
    ArchiveSearcher searcher = ...
    void showSearch(final String target)
        throws InterruptedException {
      Future<String> future
        = executor.submit(new Callable<String>() {
          public String call() {
              return searcher.search(target);
          }});
      displayOtherThings(); // do other things while searching
      try {
        displayText(future.get()); // use future
      } catch (ExecutionException ex) { cleanup(); return; }
    }
  }

FutureTask实现了Future以及Runnable接口,所以能够被Executor执行,比如上面的例子可以用如下的例子代替:

FutureTask<String> future =
new FutureTask<String>(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
executor.execute(future);


下面我们再来介绍JUC中实现了Future/Runnable的一个类:FutureTask。

首先FutureTask,作为一个可取消的异步计算任务。这个类提供了一个Future的基本实现,以及开始计算和取消计算的方法,查询计算是否完成,以及索取计算结果的方法。结果仅在计算完成之后才能获取。get方法将会在计算结果还没时阻塞。计算完成之后,这个计算就不能重新开启或者取消了(除非通过runAndRest方法)。

FutureTask能够用于包装Callable或者Runnable。因为FutureTask实现了Runnable,一个FutureTask能够被提交到Executor去执行(事实上也是这么构造的)。这个类还提供了受保护的方法能够用于定制自己的策略。


实现:

首先我们来看实际执行任务的run方法:

    public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

详情如下:

  • 首先检查state假如不为NEW或者没有设置当前线程为执行线程,则任务已经被执行过或者被其他线程执行中,返回。
  • 取得调用任务Callable,那么设置执行任务,并且用ran记录任务状态,假如有异常则执行setException。
  • 否则ran为true,任务执行完成则调用set方法。
  • 最后runnner置为null,假如此刻状态>=INTERRUPTING,则退让等待状态变为INTERRUPTED。
再看setException和set方法:
    protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

 

   protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
可以看出来他们之间的区别仅在于EXCEPTIONAL和NORMAL,这里EXCEPTIONAL为异常状态,NORMAL表明任务已经正式完成。

这个还有个COMPLETING中间状态(这个是事实上的决定任务结果的CAS操作,它后面的put操作是在结果已定的情况下执行,所以不需要CAS,并且这一步的作用是happens-before,传递结果值outcome)。


我们再来看get操作:

    public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
分别调用await的限时和非限时版本,最后返回的值<=COMPLETING则为超时状态。

    private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}

详情如下:

  • 与所有其他的阻塞策略相比,这里较简单,探测的条件为s>COMPLETING。
  • 首先探测线程是否中断,如果中断则尝试从当前栈中删除这个等待者,并抛出异常InterruptedException。
  • 在状态为COMPLETING时采取退让策略yield。
  • 否则构造wait节点,进入LIFO队列,然后再限时或者非限时阻塞。
我们再来看set和setException中的finishCompletion方法,它实际上是和等待线程的协作:
    private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();

callable = null; // to reduce footprint
}
这个方法是在state改变之后调用的,采用的方式是不断探测waiters(FIFO队列),当它不为空时unpark队列中的每一个线程。所以等待的线程保证可以被唤醒(在完成或者异常的情况下)。done方法可以用于定制。
我们最后来看取消方法:
    public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
mayInterruptIfRunning用于指示是否强制取消。
详情如下:
  • 首先判断当前状态state,假如已经改变或者CAS操作在此失败,那么久返回false说明取消失败(原因是任务执行完成或者任务异常)。
  • 成功之后假如参数为true,则试着中断当前的执行任务,并且将状态最终改变为INTERRUPTED。
  • 最后同样调用finishCompletion并返回true。