简析Java中的util.concurrent.Future接口

时间:2022-06-02 00:32:58

在一个单线程应用中,当你调用一个方法只有计算结束才会返回结果( IOUtils.toString()  comes from Apache Commons IO ):
 

?
1
2
3
4
5
6
7
8
9
public String downloadContents(URL url) throws IOException {
  try(InputStream input = url.openStream()) {
    return IOUtils.toString(input, StandardCharsets.UTF_8);
  }
}
 
//...
 
final Future<String> contents = downloadContents(new URL("http://www.example.com"));

downloadContents() 看上去是无害的, 但是它需要任意长的时间来完成。同时,为了减少延迟,在等待结果的期间,你可能需要同时独立的处理其它的工作。以前你可能会启动一个新的线程 或者等待结果(共享内存,锁,糟糕的 wait()/notify()对).

通过 Future<T> 模式,它会变得明朗:
 

?
1
2
3
4
5
6
7
public static Future<String> startDownloading(URL url) {
  //...
}
 
final Future<String> contentsFuture = startDownloading(new URL("http://www.example.com"));
//other computation
final String contents = contentsFuture.get();

我们马上会实现 startDownloading(), startDownloading()不会被阻塞,而是等待外部的站点回应,你理解这一原则是很重要的。 相反,如果它快速返回了,返回一个轻量级的Future<String> 对象。 这个对象是一个promise那么将来字符串类型就是可用的,虽然我们不知道什么时候,但是会保留这个引用直到它返有结果返回,你就可以通过Future.get()来获取它。 换句话说,Future是一个代理或者一个对象的包装,不是真实的目标对象。一旦异步计算完成,你就可以提取它。 那么Future提供了什么样的接口呢?

Future.get()是最重要的方法。它阻塞和等待直到承诺的结果是可用状态, 因此如果我们确实需要这个字符串,就调用get() 方法然后等待。 还有一个接受超时参数的重载版本,如果哪里出现问题你就不用一直等待下去,超过设定时间就会抛出 TimeoutException。


在某些情况下,你可能想不停地偷偷看看Future是否可用了。这可以通过isDone()来完成。想象一个情景,你的用户等待某些异步的计算,你想让他知道这种情况, 同时去做一些其它的计算:
 

?
1
2
3
4
5
6
final Future<String> contentsFuture = startDownloading(new URL("http://www.example.com"));
while (!contentsFuture.isDone()) {
  askUserToWait();
  doSomeComputationInTheMeantime();
}
contentsFuture.get();

最后Future.get()调用的内容会保证马上返回,不会被阻塞,因为Future.isDone() 返回了true。如果你遵循这个模式,就不会忙于每秒百万次的交替等待和调用isDone()。


取消futrues是最后一个我们还没有覆盖到的。想象你启动了异步的工作并且你只能等待一些时间, 如果2秒钟后,我们放弃,或者把错误传递出去,或者采用临时方案解决它。然而,你是一个好市民,你应该告诉这个future对象:我不需要你了,你别管了。 那么你可以通过停止过时的任务,来节约资源。语法很简单:
 

?
1
contentsFuture.cancel(true);  //meh...


我们都喜欢隐藏的,布尔类型的参数,对吗?取消可以通过两种方式来实现:在任务启动前通过传递false参数来取消,前提是当Future表达的结果计算开始之前。一旦Callable.call()已经运行到一半,那么我们想让它结束,如果我们传递true,那么Future.call()就会具有侵入性,试图打断正在运行的工作。你觉得这样好吗?现象那些抛出InterruptedException这个声名狼藉的异常的方法,如Thread.sleep(), Object.wait(),Condition.await(),等,甚至包括Future.get(). 如果你被阻塞在这种方法并且有人决定取消你的调用,他们会毫无疑问的抛出InterruptionException,并发出有人要打断当前运行的任务。


因此我们现在明白了Future是什么--- 一个占位符,你可以在未来得到目标对象。就像对于一辆车,还没有制造出来的钥匙。但是你怎样才能在应用程序中获得Future的实例? 两种最普通的资源是线程池和异步方法(线程池支持)。因此, startDownloading()方法可以被重写为:
 

?
1
2
3
4
5
6
7
8
9
10
11
12
private final ExecutorService pool = Executors.newFixedThreadPool(10);
 
public Future<String> startDownloading(final URL url) throws IOException {
  return pool.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
      try (InputStream input = url.openStream()) {
        return IOUtils.toString(input, StandardCharsets.UTF_8);
      }
    }
  });
}


虽然有大量的繁琐的语法问题,但是基本思想是简单的: 把需要长时间运行的计算包装到可调用的<String>,并submit()到线程池,这个线程池包含10个线程。 提交后返回Future<String>的实现,就像以某种方式链接到你的任务和线程池。明显的你的任务不会被立即执行,相反它被放到一个队列中,稍后会被线程拉出来, 现在需要搞清楚cancel()的两个特别的意义是什么——你可以取消在队列中停留的任务,也可以取消早已运行的任务,但这是一件比较复杂的事情。


你还可以在Spring 和 EJB 碰上Future。比如Spring框架的中你可以为方法加入@Async的注解:

?
1
2
3
4
5
6
7
8
@Async
public Future<String> startDownloading(final URL url) throws IOException {
  try (InputStream input = url.openStream()) {
    return new AsyncResult<>(
        IOUtils.toString(input, StandardCharsets.UTF_8)
    );
  }
}


注意,我们简单地通过包装结果到AsyncResult来实现Future,但是这个方法本身不会与线程池交互或者异步处理。稍后 Spring会代理所有的调用来startDownloading()并在线程池中执行。 在EJB中,相同的特性通过加@Asynchronousannotation 来完成。