Future模式
【1】Future模式是多线程开发中常见的设计模式,它的核心思想是异步调用。对于Future模式来说,它无法立即返回你需要的数据,但是它会返回一个契约,将来你可以凭借这个契约去获取你需要的信息。
【2】通俗一点就是生产者-消费者模型的扩展。经典“生产者-消费者”模型中消息的生产者不关心消费者何时处理完该条消息,也不关心处理结果。Future模式则可以让消息的生产者等待直到消息处理结束,如果需要的话还可以取得处理结果。
java中是如何实现Future模式
【1】直接继承Thread或者实现Runnable接口都可以创建线程,但是这两种方法都有一个问题 就是:没有返回值,也就是不能获取执行完的结果。
【2】因此java1.5就提供了Callable接口来实现这一场景,而Future和FutureTask就可以和Callable接口配合起来使用。【从而达到Future模式的效果】
Callable和Runnable的区别
【1】源码展示
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
【2】分析说明
Runnable 的缺陷:1.不能返回一个返回值 2.不能抛出 checked Exception。
Callable的call方法可以有返回值,可以声明抛出异常。
【3】疑问解析
1)为什么需要 Callable?
Callable 配合 Future 类 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是 Runnable 做不到的,因为它没有返回值,不能抛出异常。
了解Future接口
【1】介绍 :Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。 必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
【2】源码展示
public interface Future<V> { // 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束 boolean cancel(boolean mayInterruptIfRunning); //任务是否已经取消,任务正常完成前将其取消,则返回true boolean isCancelled(); //需要注意的是如果任务正常终止、异常或取消,都将返回true boolean isDone(); //取得返回对象 V get() throws InterruptedException, ExecutionException; //取得返回对像,允许等待设置的时间范围 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
了解FutureTask类(Future接口的实现类)
【1】介绍说明
1)该对象相当于是消费者和生产者的桥梁,消费者通过FutureTask存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask被转型为Future接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。
2)FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。
【2】代码展示
0)继承关系
public class FutureTask<V> implements RunnableFuture<V> public interface RunnableFuture<V> extends Runnable, Future<V>
1)属性值
// 表示当前任务的状态 private volatile int state; // 表示当前任务的状态是新创建的,尚未执行 private static final int NEW = 0; // 表示当前任务即将结束,还未完全结束,值还未写,一种临界状态 private static final int COMPLETING = 1; // 表示当前任务正常结束 private static final int NORMAL = 2; // 表示当前任务执行过程中出现了异常,内部封装的callable.call()向上抛出异常了 private static final int EXCEPTIONAL = 3; // 表示当前任务被取消 private static final int CANCELLED = 4; // 表示当前任务中断中 private static final int INTERRUPTING = 5; // 表示当前任务已中断 private static final int INTERRUPTED = 6; // 我们在使用FutureTask对象的时候,会传入一个Callable实现类或Runnable实现类,这个callable存储的就是 // 传入的Callable实现类或Runnable实现类(Runnable会被使用修饰者设计模式伪装为) private Callable<V> callable; // 正常情况下,outcome保存的是任务的返回结果 // 不正常情况下,outcome保存的是任务抛出的异常 private Object outcome; // 保存的是当前任务执行期间,执行任务的线程的引用 private volatile Thread runner; // 因为会有很多线程去get结果,这里把线程封装成WaitNode,一种数据结构:栈,头插头取 private volatile WaitNode waiters; static final class WaitNode { // 线程对象 volatile Thread thread; // 下一个WaitNode结点 volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
2)构造方法
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { //封装成callable,但返回值为传入的值 this.callable = Executors.callable(runnable, result); this.state = NEW; }
3)核心方法
1.run()方法
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; // 只有当任务状态为new并且runner旧值为null才会执行到这里 try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 调用callable.run()并返回结果 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) // 内部设置outcome为callable执行的结果,并且更新任务的状态为NORMAL(任务正常执行)并且唤醒阻塞的线程 set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) // 如果当前任务处于中断中,则执行这个方法线程会不断让出cpu直到任务处于已中断状态 handlePossibleCancellationInterrupt(s); } }
2.set(V v)方法
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 设置outcome(结果)为callable.run()返回的结果 outcome = v; //修改状态 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // 唤醒调用get()的所有等待的线程并清空栈 finishCompletion(); } } protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 设置outcome(结果)为callable.run()抛出的异常 outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
3.get()方法
public V get() throws InterruptedException, ExecutionException { int s = state; // 条件成立会调用awaitDone方法自旋等待直到任务完成 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } //这个方法是真正用来获取任务的返回结果的,这个方法在get()方法里面会被调用,如果该方法被调用,说明任务已经执行完了。 private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
4.awaitDone(boolean timed, long nanos)方法
// 这个方法的作用是等待任务被完成(正常完成或出现异常完成都算完成),被中断,或是被超时 private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 如果当前线程出现中断异常,则将该线程代表的WaitNode结点移出栈并抛出中断异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 如果当前任务状态大于COMPLETING,说明当前任务已经有结果了(任务完成、中断、取消),直接返回任务状态 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 当前任务处于临界状态,即将完成,则当前线程释放cpu else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 第一次自旋,如果当前WitNode为null,new一个WaitNode结点 else if (q == null) q = new WaitNode(); // 第二次自旋,如果当前WaitNode节点没有入队,则尝试入队 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q); // 第三次自旋,到这里表示是否定义了超时时间 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } // 未超出时间,挂起当前线程一定时间 LockSupport.parkNanos(this, nanos); } else // 挂起当前线程,该线程会休眠(什么时候该线程会继续执行呢?除非有其他线程调用unpark()或者中断该线程) LockSupport.park(this); } }
5.finishCompletion()方法
//任务执行完成(正常结束和非正常结束都代表任务执行完成)会调用这个方法来唤醒所有因调用get()方法而陷入阻塞的线程。 private void finishCompletion() { // 如果条件成立,说明当前有陷入阻塞的线程 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; // 执行到这里说明还有因调用get()而陷入阻塞的线程,自旋接着唤醒 // 这里q.next设置为null帮助GC(垃圾回收) q.next = null; // unlink to help gc q = next; } break; } } //拓展方法 done(); // 将callable设置为null,方便GC callable = null; }
【3】注意事项
1)当 for 循环批量获取 Future 的结果时容易 block,get 方法调用时应使用 timeout限制
2)Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来
3)FutureTask 一般是结合线程池使用,然后额外采用FutureTask获取结果。
【4】Future的局限性
从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:
1)并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
2)无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
3)无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
4)没有异常处理:Future接口中没有关于异常处理的方法;
了解CompletionService接口
【1】介绍
1)CompletionService 接口是一个独立的接口,并没有扩展 ExecutorService 。 其默认实现类是ExecutorCompletionService;
2)接口CompletionService 的功能是:以异步的方式一边执行未完成的任务,一边记录、处理已完成任务的结果。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。
3)简单来说,CompletionService 就是监视着 Executor线程池执行的任务,用 BlockingQueue 将完成的任务的结果存储下来。(当然,这个也可以是程序员自己去实现,但是要不断遍历与每个任务关联的 Future,然后不断去轮询,判断任务是否已经完成,比较繁琐);
【2】源码展示
public interface CompletionService<V> { //提交一个 Callable 任务;一旦完成,便可以由take()、poll()方法获取 Future<V> submit(Callable<V> task); //提交一个 Runnable 任务,并指定计算结果; Future<V> submit(Runnable task, V result); //获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。 Future<V> take() throws InterruptedException; //获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null。 Future<V> poll(); //获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则将等待指定的时间(如果有必要) Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
了解ExecutorCompletionService类(CompletionService接口的实现类)
【1】介绍
1)内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。
【2】源码分析
1)属性分析
//线程池 private final Executor executor; //判断线程池是否继承抽象类 private final AbstractExecutorService aes; //阻塞队列 private final BlockingQueue<Future<V>> completionQueue;
2)构造方法
//对于线程池必须定义,而阻塞队列会有默认的 //而默认的LinkedBlockingQueue对于并发编程来说是存在隐患的(依据阿里手册来说,因为队列的无尽性会导致OOM) //所以一般考虑要你自己去定义阻塞队列 public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; //如果是继承了抽象类的实现 this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
3)阻塞队列元素的定义
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } //FutureTask里面的拓展方法,在run的时候会被调用,所以是做完任务了会自动提交到队列里面 protected void done() { completionQueue.add(task); } private final Future<V> task; }
4)实现接口的方法
//采用newTaskFor来封装非标准的取消 //因为传入的Callable或Runnable,这种不是FutureTask,故需要封装 private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); } //下面是对接口定义的方法的实现 public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; } public Future<V> take() throws InterruptedException { return completionQueue.take(); } public Future<V> poll() { return completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
【3】汇总说明
1)说白了就是基于FutureTask 是单线程的任务,考虑可以等待获取返回结果,那么应该可以采用线程池的方法形成多任务并发的结果。
2)故定义了CompletionService接口作为规范,ExecutorCompletionService类作为具体的实现类【作为管理者】,不然每次采用线程池来做的话都要自己定义去管理。
3)当需要批量提交异步任务的时候建议你使用CompletionService。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。
4)CompletionService能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。
5)线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
【4】示例展示
1)示例代码
public class CompletionServiceDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { //创建线程池 ExecutorService executor = Executors.newFixedThreadPool(10); //创建CompletionService CompletionService<Integer> cs = new ExecutorCompletionService<>(executor); //异步向电商S1询价 cs.submit(() -> getPriceByS1()); //异步向电商S2询价 cs.submit(() -> getPriceByS2()); //异步向电商S3询价 cs.submit(() -> getPriceByS3()); //将询价结果异步保存到数据库 for (int i = 0; i < 3; i++) { //从阻塞队列获取futureTask Integer r = cs.take().get(); executor.execute(() -> save(r)); } executor.shutdown(); } private static void save(Integer r) { System.out.println("保存询价结果:{}"+r); } private static Integer getPriceByS1() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(5000); System.out.println("电商S1询价信息1200"); return 1200; } private static Integer getPriceByS2() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(8000); System.out.println("电商S2询价信息1000"); return 1000; } private static Integer getPriceByS3() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(3000); System.out.println("电商S3询价信息800"); return 800; } }
了解CompletableFuture
【1】介绍
1)简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Fueture 实现,是非常麻烦的。
2)CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
3)CompletableFuture除了实现Future接口还实现了CompletionStage接口。
4)CompletionStage接口: 执行某一个阶段,可向下执行后续阶段。异步执行,默认线程池是ForkJoinPool.commonPool()。
【2】常用方法
1)描述依赖关系:
1.thenApply() 把前面异步任务的结果,交给后面的Function
2.thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回
2)描述and聚合关系:
1.thenCombine:任务合并,有返回值
2.thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值。
3.runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable)。
3)描述or聚合关系:
1.applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值。
2.acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值。
3.runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)。
4)并行执行:
1.CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行
【3】创建异步操作
1)CompletableFuture 提供了四个静态方法来创建一个异步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
2)这四个方法区别在于:
1.runAsync 方法以Runnable函数式接口类型为参数,没有返回结果,supplyAsync 方法Supplier函数式接口类型为参数,返回结果类型为U;Supplier 接口的 get() 方法是有返回值的(会阻塞)。
2.没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
3.默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
3)supplyAsync的两种获取结果的方法join&get
1.join()和get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)
【3】常用方法的使用与介绍
1)结果处理
1.介绍:
//当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法: public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) //Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。 //方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。 //这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常
2.示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } if (new Random().nextInt(10) % 2 == 0) { int i = 12 / 0; } System.out.println("执行结束!"); return "test"; }); //whenComplete一般搭配exceptionally一起使用,一个处理结果,一个处理异常 future.whenComplete(new BiConsumer<String, Throwable>() { @Override public void accept(String t, Throwable action) { System.out.println(t+" 执行完成!"); } }); future.exceptionally(new Function<Throwable, String>() { @Override public String apply(Throwable t) { System.out.println("执行失败:" + t.getMessage()); return "异常xxxx"; } });
2)结果转换
1.介绍:所谓结果转换,就是将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。
2.方法列举:
【1】thenApply
1.说明
//thenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
2.示例
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int result = 100; System.out.println("一阶段:" + result); return result; }).thenApply(number -> { int result = number * 3; System.out.println("二阶段:" + result); return result; });
【2】thenCompose
1.说明
//thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。 public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
2.示例
CompletableFuture<Integer> future = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(30); System.out.println("第一阶段:" + number); return number; } }) .thenCompose(new Function<Integer, CompletionStage<Integer>>() { @Override public CompletionStage<Integer> apply(Integer param) { return CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = param * 2; System.out.println("第二阶段:" + number); return number; } }); } });
3.说明:
【1】thenApply 和 thenCompose的区别
1.thenApply 转换的是泛型中的类型,返回的是同一个CompletableFuture;
2.thenCompose 将内部的 CompletableFuture 调用展开来并使用上一个CompletableFutre 调用的结果在下一步的 CompletableFuture 调用中进行运算,是生成一个新的CompletableFuture。
3.示例
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> result1 = future.thenApply(param -> param + " World"); CompletableFuture<String> result2 = future .thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World")); System.out.println(result1.get());
3)结果消费
1.介绍:
【1】与结果处理和结果转换系列函数返回一个新的 CompletableFuture 不同,结果消费系列函数只对结果执行Action,而不返回新的计算值。
【2】根据对结果的处理方式,结果消费函数又分为:
thenAccept系列:对单个结果进行消费
thenAcceptBoth系列:对两个结果进行消费
thenRun系列:不关心结果,只对结果执行Action
2.方法列举:
【1】thenAccept
1.说明
//通过观察该系列函数的参数类型可知,它们是函数式接口Consumer,这个接口只有输入,没有返回值。 public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
2.示例
CompletableFuture<Void> future = CompletableFuture .supplyAsync(() -> { int number = new Random().nextInt(10); System.out.println("第一阶段:" + number); return number; }).thenAccept(number -> System.out.println("第二阶段:" + number * 5));
【2】thenAcceptBoth
1.说明
//thenAcceptBoth 函数的作用是,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的action消费两个异步的结果。 public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
2.示例
CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(3) + 1; try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一阶段:" + number); return number; } }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(3) + 1; try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第二阶段:" + number); return number; } }); futrue1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() { @Override public void accept(Integer x, Integer y) { System.out.println("最终结果:" + (x + y)); } });
【3】thenRun
1.说明
//thenRun 也是对线程任务结果的一种消费函数,与thenAccept不同的是,thenRun 会在上一阶段 CompletableFuture 计算完成的时候执行一个Runnable,Runnable并不使用该 CompletableFuture 计算的结果。 public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action);
2.示例
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { int number = new Random().nextInt(10); System.out.println("第一阶段:" + number); return number; }).thenRun(() -> System.out.println("thenRun 执行"));
4)结果组合
1.方法列举:
【1】thenCombine
1.说明
//thenCombine 方法,合并两个线程任务的结果,并进一步处理。 public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
2.示例
CompletableFuture<Integer> future1 = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(10); System.out.println("第一阶段:" + number); return number; } }); CompletableFuture<Integer> future2 = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(10); System.out.println("第二阶段:" + number); return number; } }); CompletableFuture<Integer> result = future1 .thenCombine(future2, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer x, Integer y) { return x + y; } });
5)任务交互
1.介绍:所谓线程交互,是指将两个线程任务获取结果的速度相比较,按一定的规则进行下一步处理。
2.方法列举:
【1】applyToEither
1.说明
//两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。 public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
2.示例
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(10); System.out.println("第一阶段start:" + number); try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一阶段end:" + number); return number; } }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(10); System.out.println("第二阶段start:" + number); try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第二阶段end:" + number); return number; } }); future1.applyToEither(future2, new Function<Integer, Integer>() { @Override public Integer apply(Integer number) { System.out.println("最快结果:" + number); return number * 2; } });
【2】acceptEither
1.说明
//两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。 public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
2.示例
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(10) + 1; try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一阶段:" + number); return number; } }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(10) + 1; try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第二阶段:" + number); return number; } }); future1.acceptEither(future2, new Consumer<Integer>() { @Override public void accept(Integer number) { System.out.println("最快结果:" + number); } });
【3】runAfterEither
1.说明
//两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。 public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
2.示例
CompletableFuture<Integer> future1 = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(5); try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一阶段:" + number); return number; } }); CompletableFuture<Integer> future2 = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(5); try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第二阶段:" + number); return number; } }); future1.runAfterEither(future2, new Runnable() { @Override public void run() { System.out.println("已经有一个任务完成了"); } }).join();
【4】runAfterBoth
1.说明
//两个线程任务相比较,两个全部执行完成,才进行下一步操作,不关心运行结果。 public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
2.示例
CompletableFuture<Integer> future1 = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一阶段:1"); return 1; } }); CompletableFuture<Integer> future2 = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第二阶段:2"); return 2; } }); future1.runAfterBoth(future2, new Runnable() { @Override public void run() { System.out.println("上面两个任务都执行完成了。"); } });
【5】anyOf
1.说明
//anyOf 方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFuture。 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
2.示例
Random random = new Random(); CompletableFuture<String> future1 = CompletableFuture .supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(random.nextInt(5)); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }); CompletableFuture<String> future2 = CompletableFuture .supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(random.nextInt(1)); } catch (InterruptedException e) { e.printStackTrace(); } return "world"; }); CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
【6】allOf
1.说明
//allOf方法用来实现多 CompletableFuture 的同时返回。 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
2.示例
CompletableFuture<String> future1 = CompletableFuture .supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("future1完成!"); return "future1完成!"; }); CompletableFuture<String> future2 = CompletableFuture .supplyAsync(() -> { System.out.println("future2完成!"); return "future2完成!"; }); CompletableFuture<Void> combindFuture = CompletableFuture .allOf(future1, future2); try { combindFuture.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
【4】CompletableFuture常用方法总结: