前言
随着多核处理器的出现,如何轻松高效的进行异步编程变得愈发重要,我们看看在java8之前,使用java语言完成异步编程有哪些方案。
JAVA8之前的异步编程
- 继承Thead类,重写run方法
- 实现runable接口,实现run方法
- 匿名内部类编写thread或者实现runable的类,当然在java8中可以用lambda表达式简化
- 使用futureTask进行附带返回值的异步编程
- 使用线程池和Future来实现异步编程
- spring框架下的
@async
获得异步编程支持
使用线程池与future来实现异步编程
实现方式可谓是多种多样,这里我们使用线程池和future来实现异步编程,借着这个例子来讲述java8的组合式异步编程有着怎样的优势
//构造线程池
ExecutorService pool = Executors.newCachedThreadPool();
try {
//构造future结果,doSomethingA十分耗时,因此采用异步
Future<Integer> future = pool.submit(() -> doSomethingA());
//做一些别的事情
doSomethingB();
//从future中获得结果,设置超时时间,超过了就抛异常
Integer result = future.get(10, TimeUnit.SECONDS);
//打印结果
System.out.printf("the async result is : %d", result);
//异常捕获
} catch (InterruptedException e) {
System.out.println("任务计算抛出了一个异常!");
} catch (ExecutionException e) {
System.out.println("线程在等待的过程中被中断了!");
} catch (TimeoutException e) {
System.out.println("future对象等待时间超时了!");
}
}
然而这样的异步编程方式仅仅能满足基本的需要,稍微复杂的一些异步处理Future接口似乎就有点束手无策了,例如
- 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第
一个的结果。 - 等待 Future 集合中的所有任务都完成。
- 仅等待 Future 集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同
一个值) ,并返回它的结果。 - 通过编程方式完成一个 Future 任务的执行(即以手工设定异步操作结果的方式) 。
- 应对 Future 的完成事件(即当 Future 的完成事件发生时会收到通知,并能使用 Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)
这种感觉其实就很像没有stream之前的collections的操作感觉一样,同样的,对于future,java8提供了它的函数式升级版本CompletableFuture,从名字就可以看出来这绝对是future的升级版。
JAVA8中的组合式异步编程
使用CompletableFuture进行异步编程
事物的发展往往都是由简单->复杂->简单,这里我们同样遵循这样的规律,循序渐进。
下面的例子摘取《java8实战》
的异步编程章节,并做了简化。
我们假设现在我们有一项查询商品价格的服务十分耗时,所以毫无例外的我们想让查询最佳价格的服务以异步的形式执行。
最直接的方式是直接构建一个异步查询商品价格的api,并且返回,为了演示需要,编写一个线程等待一秒的方法来模拟长时间的请求。
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
public static void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
现在getPrice这方法是一个同步的方法,该方法在经过1秒的延迟之后会返回给我们一个商品的价格(这里只是简单的根据名字构造了一个随机数)
我们使用completFuture将getPrice
转化为异步方法,如下
public Future<Double> getAsyncPrice(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
futurePrice.complete(price);
}).start();;
return futurePrice;
}
这里构造一个completableFuture对象,并另起一个异步线程,将异步计算的结果使用futurePrice.complete来接受,无需等待直接返回future结果
调用类使用Integer result = future.get(10, TimeUnit.SECONDS)
来接受返回的结果,如果等待超时则抛出异常。
另外,如果异步线程发生异常,并且在排查问题的时候想要知道具体是什么原因导致的,
可以在getAsyncPrice
方法中使用completeExcepitonally来得到异常信息并且结束这次异步任务,代码如下
public Future<Double> getPriceAsync(String product) {
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch (Exception ex) {
futurePrice.completeExceptionally(ex);
}
}).start();
return futurePrice;
}
这样,基本的功能就实现了。
使用工厂类简化异步操作
也许你看到上面的代码,会说:"我晕,你这写法比原来还复杂哦,而且我也没看出啥区别啊。",是的,上文的写法可以算是原生态的写法了,目的为为下面的知识做一个简单的铺垫。
事实上,CompleteFuture本身提供了大量的工厂方法来供我们十分方便的实现一个异步编程,他封装了前篇一律的异常与结果接收,你只需要编写真正的异步逻辑部分就可以了,同时借住于lambda表达式,可以更进一步。
supplyAsync 方法接受一个生产者(Supplier)作为参数,返回一个 CompletableFuture
对象, 该对象完成异步执行后会读取调用生产者方法的返回值。 生产者方法会交由 ForkJoinPool
池中的某个执行线程(Executor)运行,但是你也可以使用 supplyAsync 方法的重载版本,传
递第二个参数指定不同的执行线程执行生产者方法。
于是上文的例子可以改写如下
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
是不是简洁了许多呢?
可现在还有问题,这里我们成功的编写了一个十分简洁的异步方法,可实际的情况中,我们所能调用的API大部分都是同步的,因此下面将介绍如何使用异步的方法去操作这些同步API。
使用流异步操作同步API
我们现在有这么一个需求,它接受产品名作为参数,返回一个字符串列表,
这个字符串列表中包括商店的名称、该商店中指定商品的价格,商店集合以及接口设计如下。
List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"));
public List<String> findPrices(String product);
使用同步的方法实现
这样的集合变换使用stream流来操作十分容易,代码如下
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(toList());
}
stream流将 shop映射为了shop的名称以及该shop中商品的价格的字符串,并使用收集器进行收集。
使用异步的方法实现
事实上,我们完全可以使用流将shop映射成CompletableFuture对象,就好像在操作集合一样,代码如下
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture
.supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
.collect(toList());
使用这种方式,你会得到一个,List<CompletableFuture>,列表中的每个CompletableFuture 对象在计算完成后都包含商店的String类型的名称。但是,由于你用CompletableFutures 实现的findPrices方法要求返回一个List,你需要等待所有的future 执行完毕,将其包含的值抽取出来,填充到列表中才能返回。
为了实现这个效果,你可以向最初的 List<CompletableFuture> 施加第二个map 操作,对 List 中的所有future对象执行join操作,一个接一个地等待它们运行结束。注意CompletableFuture类中的join方法和Future接口中的get有相同的含义,并且也声明在Future 接口中,它们唯一的不同是join不会抛出任何检测到的异常。使用它你不再需要使用try / catch 语句块让你传递给第二个map方法的Lambda表达式变得过于臃肿。所有这些整合在一起,你就可以重新实现 findPrices 了,具体代码如下
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(
shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product)))
.collect(Collectors.toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
以上的代码你可能会疑惑,为什么不直接按照shop->completableFuture->join->collect
的方式进行流处理呢?那是因为join这一步本身是阻塞的,对于流操作来说,前一个shop没有处理完之前,是不会处理下一个shop的,所以对于每一个shop,处理到join这一步的时候就会阻塞住等待1秒,这样的话,这个流水线本身就会变回阻塞的了。
而上文的编写方法可以看出 shop->completableFuture->collect
这个操作本身是非阻塞的,顺利的将所有的请求都发出去了,随后再使用join来完成结果的收集。
使用线程池来管控异步方法
前面在介绍工厂方法时提到,可以选择第二参数放入一个线程池来进行管控。
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
接着在supplyAsync中使用该线程池即可,代码如下
CompletableFuture.supplyAsync(() -> shop.getName() + " price is " +
shop.getPrice(product), executor);
进阶的异步流操作
既然我们已经将异步操作与流相结合了,因此很容易的就会想到对于异步流来说,应该有会有类似于集合流的一些非常好用的API吧?事实上,JAVA8的确为我们提供了这些API。
构造同步和异步操作
如同集合流操作一样,异步流也可以提前安排一系列的任务,然后让异步任务有条不紊的按照这个顺序去执行。
- 同步任务
使用future.thenApply(Function)
来实现,该方法接受一个Function对象
你可以规划这样的任务 任务A(异步)->任务B(同步),语法可能是这样的
stream()
.map(xxx->supplayAsync(()->任务A)) //这一步已经异步的映射成了任务A
.map(future->future.thenApply(任务B)//执行同步的任务B
.collect
- 异步任务
与同步几乎一样,方法变为future.thenCompose(Function)
你可以规划这样的任务 任务A(异步)->任务B(同步)->任务C(异步),语法可能是这样的
stream()
.map(xxx->supplayAsync(()->任务A)) //这一步已经异步的映射成了任务A
.map(future->future.thenApply(任务B)//执行同步的任务B
.map(future->future.thenCompose(任务C))//再异步执行任务C
.collect
将两个 CompletableFuture 对象整合起来,无论它们是否存在依赖
使用thenCombine来完成,类似任务A与任务B,A是查询价格,B是查询汇率,这两个任务之间本身没有关联关系,所以可以同时发起,但你最后需要计算价格乘以汇率,因此在这两个任务完成之后需要对他们的结果进行合并,代码如下
Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product))//任务A
.thenCombine(
CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)), //任务B
(price, rate) -> price * rate); //任务A与任务B的合并操作
注意这里的任务A与任务B是异步的,但他们的合并操作是同步的,如果想要合并操作也是异步的,使用future.thenCombineAsync的异步方法版本。
对结果进行处理
使用thenAccept(Consumer)
以上都是对结果进行一些映射,你现在要对结果只进行处理,说白了就是前面的都是Function,现在要换成consumer,并且参数不再是异步任务,而是任务的结果值,举个例子,上文的任务A(异步)->任务B(同步)->任务C(异步) 的操作现在想到对他们的操作结果进行打印
就可以使用thenAccept(Consumer)
stream()
.map(xxx->supplayAsync(()->任务A)) //这一步已经异步的映射成了任务A
.map(future->future.thenApply(任务B)//执行同步的任务B
.map(future->future.thenCompose(任务C))//再异步执行任务C
.map(future->future.thenAccept(System.out::println))//将结果打印
.collect
使用allOf与anyOf对结果进行处理
需要注意的是在执行了Accpet方法之后,你得到的是一个 CompletableFuture流对象
你可以对这些流对象进行类似及早求值的操作,例如这条查询4个商家的价格服务只要有一个给出了返回结果就结束这次异步流。
CompletableFuture[] futures = findPricesStream("myPhone")
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.anyOf(futures).join();
allOf 工厂方法接收一个由 CompletableFuture 构成的数组,数组中的所有 Completable-Future 对象执行完成之后,它返回一个 CompletableFuture 对象。这意味着,如果你需要等待最初 Stream 中的所有 CompletableFuture 对象执行完毕,对 allOf 方法返回的CompletableFuture 执行 join 操作是个不错的主意。这个方法对“最佳价格查询器”应用也是有用的,因为你的用户可能会困惑是否后面还有一些价格没有返回,使用这个方法,你可以在执行完毕之后打印输出一条消息“All shops returned results or timed out” 。然而在另一些场景中,你可能希望只要 CompletableFuture 对象数组中有任何一个执行完毕就不再等待,比如,你正在查询两个汇率服务器,任何一个返回了结果都能满足你的需求。在这种情况下,你可以使用一个类似的工厂方法 anyOf 。该方法接收一个 CompletableFuture 对象构成的数组, 返回由第一个执行完毕的 CompletableFuture 对象的返回值构成的 Completable-Future
总结
本文是对Java8实战
中异步编程章节的一些整理和汇总,介绍了利用新增的completableFuture将异步任务与流操作集合起来实现组合式异步编程,利用工厂方法与函数接口可以大大的简化代码,同时提高代码的可阅读性,想要查看详细,可以自行翻阅该书。