Java异步编程CompletableFuture(串行,并行,批量执行)-四、CompletableFuture常用方法介绍

时间:2024-11-12 19:06:01

消息打印小工具类(用不用都行)

主要观察和学习CompletableFuture方法

import java.util.StringJoiner;
/**
 * 小工具
 */
@Slf4j
public class SmallTool {

    /** 
     * 睡眠
     */ 
    public static void sleepMillis(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /** 
     * 打印 message
     */ 
    public static void printTimeAndThread(String tag) {
        String result = new StringJoiner("\t|\t")
                .add(String.valueOf(System.currentTimeMillis()))
                .add(String.valueOf(Thread.currentThread().getId()))
                .add(Thread.currentThread().getName())
                .add(tag)
                .toString();
        System.out.println(result);
    }
}

4.1 串行

4.1.1 前一个返回有结果再执行下一个 thenCompose 、 thenApply()、thenApplyAsync、 thenAccept() 、thenRun()

作用都是使得一个阶段的结果可以作为下一个阶段的输入

尽管它们的目标相同,但它们在使用方式和功能上有一些区别。

  • thenCompose()thenApply()方法用于将前一个阶段的结果与下一个阶段进行组合
  • thenApplyAsync()方法在此基础上实现异步执行
  • thenAccept()方法用于处理前一个阶段的结果而不需要返回值
  • thenRun()方法用于在前一个阶段完成后执行一些操作

不同的方法适用于不同的场景,可以根据实际需求选择合适的方法来组合异步计算。

    public static void main(String[] args) {
        SmallTool.printTimeAndThread("小白进入餐厅");
        SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("厨师炒菜");
            SmallTool.sleepMillis(200);
            return "番茄炒蛋";
        }).thenCompose(dish -> CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("服务员打饭");
            SmallTool.sleepMillis(100);
            return dish + " + 米饭";
        }));
			
        SmallTool.printTimeAndThread("小白在打王者");
        SmallTool.printTimeAndThread(String.format("%s 好了,小白开吃", cf1.join()));
    }

类似
在这里插入图片描述

4.2 并行

4.2.1两个同时执行,等待结果一起返回thenCombine()

    public static void main(String[] args) {
        SmallTool.printTimeAndThread("小白进入餐厅");
        SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("厨师炒菜");
            SmallTool.sleepMillis(200);
            return "番茄炒蛋";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("服务员蒸饭");
            SmallTool.sleepMillis(300);
            return "米饭";
        }), (dish, rice) -> {
            SmallTool.printTimeAndThread("服务员打饭");
            SmallTool.sleepMillis(100);
            return String.format("%s + %s 好了", dish, rice);
        });

        SmallTool.printTimeAndThread("小白在打王者");
        SmallTool.printTimeAndThread(String.format("%s ,小白开吃", cf1.join()));

    }

4.2.2 两个任务都完成了,不关注执行结果的进行下一步操作runAfterBoth/runAfterBothAsync

 public static void main(String[] args) {
        SmallTool.printTimeAndThread("小白进入餐厅");
        SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("厨师炒菜");
            SmallTool.sleepMillis(200);
            return "番茄炒蛋";
        });
        cf1.runAfterBoth(CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("服务员蒸饭");
            SmallTool.sleepMillis(300);
            return "米饭";
        }), () -> {
            SmallTool.printTimeAndThread("厨师和服务员都执行完了");
        }).join();
        SmallTool.printTimeAndThread("结束");
    }

在这里插入图片描述

4.2.3 两个任务并行进行用快的那个的结果作为后续处理applyToEither\applyToEitherAsync

    public static void main(String[] args) {
        SmallTool.printTimeAndThread("张三走出餐厅,来到公交站");
        SmallTool.printTimeAndThread("等待 700路 或者 800路 公交到来");

        CompletableFuture<String> bus = CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("700路公交正在赶来");
            SmallTool.sleepMillis(1);
            return "700路到了";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("800路公交正在赶来");
            SmallTool.sleepMillis(3);
            return "800路到了";
        }), firstComeBus -> firstComeBus);

        SmallTool.printTimeAndThread(String.format("%s,小白坐车回家", bus.join()));
    }

在这里插入图片描述

4.3 异常处理

如果在原始的supplyAsync()任务中发生一个错误,这时候没有任何thenApply()会被调用并且future将以一个异常结束。如果在第一个thenApply发生错误,这时候第二个和第三个将不会被调用,同样的,future将以异常结束。

4.3.1 使用 exceptionally() 回调处理异常

  • exceptionally()当前一个阶段的计算出现异常时,会执行该函数,并将异常作为输入参数。你可以在这里记录这个异常并返回一个默认值。
public static void main(String[] args) {
        SmallTool.printTimeAndThread("张三走出餐厅,来到公交站");
        SmallTool.printTimeAndThread("等待 700路 或者 800路 公交到来");

        CompletableFuture<String> bus = CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("700路公交正在赶来");
            SmallTool.sleepMillis(100);
            return "700路到了";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("800路公交正在赶来");
            SmallTool.sleepMillis(200);
            return "800路到了";
        }), firstComeBus -> {
            SmallTool.printTimeAndThread(firstComeBus);
            if (firstComeBus.startsWith("700")) {
                throw new RuntimeException("撞树了……");
            }
            return firstComeBus;
        }).exceptionally(e -> {
            SmallTool.printTimeAndThread(e.getMessage());
            SmallTool.printTimeAndThread("小白叫出租车");
            return "出租车 叫到了";
        });

        SmallTool.printTimeAndThread(String.format("%s,小白坐车回家", bus.join()));
    }

在这里插入图片描述如果上方不好理解,可以看下面方法

public static void main(String[] args) {
		Integer age = -1;
		
		CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
		    if(age < 0) {
		        throw new IllegalArgumentException("Age can not be negative");
		    }
		    if(age > 18) {
		        return "Adult";
		    } else {
		        return "Child";
		    }
		}).exceptionally(ex -> {
		    System.out.println("Oops! We have an exception - " + ex.getMessage());
		    return "Unknown!";
		});
		
		System.out.println("Maturity : " + maturityFuture.get()); 
}

4.3.2 使用 handle() 方法处理异常

API提供了一个更通用的方法 - handle()从异常恢复,无论一个异常是否发生它都会被调用。

public static void main(String[] args) {
		Integer age = -1;
		CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
		    if(age < 0) {
		        throw new IllegalArgumentException("Age can not be negative");
		    }
		    if(age > 18) {
		        return "Adult";
		    } else {
		        return "Child";
		    }
		}).handle((res, ex) -> {
		    if(ex != null) {
		        System.out.println("Oops! We have an exception - " + ex.getMessage());
		        return "Unknown!";
		    }
		    return res;
		});
		
		System.out.println("Maturity : " + maturityFuture.get());
}

4.3.3 使用 whenComplete()/whenCompleteAsync() 方法处理异常

whenComplete()handle()方法在功能上确实有一些相似之处。它们都可以在异步计算完成后执行一些操作,并且都可以处理结果或异常。

区别:

  • whenComplete()主要用于执行副作用操作,没有返回值
  • handle()主要用于处理结果和异常,并返回一个新的结果

根据具体需求,可以选择使用不同的方法来满足需求。

参考:

	//有异常时执行exceptionally()
    CompletableFuture.runAsync(() -> System.out.println("执行自己的操作"), asyncTaskExecutor)
             .exceptionally(e -> {
                 log.info("获取浙农码失败{}", e.getMessage());
                 return null;
             });
     //需要返回值
     CompletableFuture.runAsync(() -> System.out.println("执行自己的操作"), asyncTaskExecutor)
             .handle((result,ex) -> {
                 if (ex != null) log.info("品种推荐同步执行失败", ex);
                 return result;
             });
     //没有返回值
     CompletableFuture.runAsync(() -> System.out.println("执行自己的操作"), asyncTaskExecutor)
             .whenComplete((result,ex) -> {
                 if (ex != null) log.info("品种推荐同步执行失败", ex);
             });

4.4 批量执行(组合多个CompletableFuture)

  • 我们使用thenCompose() thenCombine()把两个CompletableFuture组合在一起。现在如果你想组合任意数量的CompletableFuture,应该怎么做?我们可以使用以下两个方法组合任意数量的CompletableFuture。
    static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
    static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
    

区别

  • CompletableFuture.allOf()返回一个CompletableFuture<Void>对象,表示所有传入的CompletableFuture都已经完成
  • completableFuture.anyOf()返回一个CompletableFuture<Object>对象,表示任意一个传入的CompletableFuture对象已经完成

4.4.1 CompletableFuture.allOf()

所有传入的CompletableFuture都已经完成

案例中你只需关注allOf()方法的使用就可以了

没有返回值
    public static void main(String[] args) {

        SmallTool.printTimeAndThread("小白和小伙伴们 进餐厅点菜");
        // 点菜
        ArrayList<Dish> dishes = new ArrayList<>();
        for (int i = 1; i <= 12; i++) {
            Dish dish = new Dish("菜" + i, 1);
            dishes.add(dish);
        }
        // 做菜
        ArrayList<CompletableFuture> cfList = new ArrayList<>();
        for (Dish dish : dishes) {
            CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> dish.make());
            cfList.add(cf);
        }
        // 等待所有任务执行完毕
        CompletableFuture.allOf(cfList.toArray(new CompletableFuture[cfList.size()])).join();
    }

使用Lambda表达式(和上面逻辑一样)

 public static void main(String[] args) {

        SmallTool.printTimeAndThread("小白和小伙伴们 进餐厅点菜");
   			CompletableFuture[] dishes = IntStream.rangeClosed(1, 12)
                .mapToObj(i -> new Dish("菜" + i, 1))
                .map(dish -> CompletableFuture.runAsync(dish::make))
                .toArray(size -> new CompletableFuture[size]);

        CompletableFuture.allOf(dishes).join();
    }
  • 使用IntStream.rangeClosed(1, 12)创建了一个整数流,表示1到12之间的数字
  • 使用mapToObj方法将每个数字转换为一个Dish对象。这里使用了一个自定义的Dish类,表示一个菜品,其中包含菜品名称和制作所需时间
  • 使用map方法将每个Dish对象转换为一个CompletableFuture对象。这里使用了CompletableFuture.runAsync方法,该方法会接收一个Runnable参数,并在后台线程中异步执行Runnable的任务。这里使用了方法引用dish::make,表示异步执行Dish对象的make方法。
  • 最后,使用toArray方法将生成的CompletableFuture对象存储在一个CompletableFuture数组中。该数组将用于后续操作。
有返回值
  @Slf4j
public class OrderJob  {
    
    @Autowired
    private businessServie businessServie;
    
    @Resource(name ="asyncTaskExecutor")
    private ThreadPoolTaskExecutor asyncTaskExecutor;

    public void execute() {
        //1.从表中查询出1000条订单数据处理
        List<Order> orderList = businessServie.selectList();
        
        //每条子线程处理数据的数量
        int perCount = 100;
        //拆分集合,每个100
        List<List<Order>> partitions = Lists.partition(orderList, perCount);
        log.info("批量处理多线程开始,本次处理的订单数量:{}",orderList.size());
        log.info("多线程数量:{}",partitions.size());

        List<CompletableFuture> futures = Lists.newArrayList();
        for(List<Order> dataList : partitions){
            List<Order> finalDataList = dataList;
            CompletableFuture<List<Order>> future = CompletableFuture.supplyAsync(() -> createOrder(finalDataList),asyncTaskExecutor);
            futures.add(future);
        }
        //等待所有线程执行完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
        //获取线程返回结果,封装集合
        List<Order> updateList = new ArrayList<>();

        futures.stream().forEach(future->{
            try {
                List<Order> list = (List<Order>) future.get();
                if(CollectionUtils.isNotEmpty(list)){
                    updateList.addAll(list);
                }
            }catch (Exception e){
                log.error("获取多线程返回结果数据异常",e);
            }
        });
        businessServie.updateBatchList(updateList);
    }
}

4.4.2 CompletableFuture.anyOf()

任意一个传入的CompletableFuture对象已经完成

//调用5次同时执行,等待最快返回的结果
    List<CompletableFuture<String>> futures = new ArrayList<>();
     for (int i = 0; i < 5; i++) {
         int finalI = i;
         CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
             String result = HttpRequest.post(urlRequestId)
                     .body(JSON.toJSONString(body))
                     .headerMap(header, true)
                     .timeout(3000)
                     .execute()
                     .body();
             log.info("获取第三方接口信息");
             return result;
         });
         futures.add(future);
     }

     // 等待任意一个CompletableFuture完成
     CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()]));
     String join = (String) anyFuture.join();

这里也可以设置等待时间

等待任意一个CompletableFuture完成,最多等待5秒,如果在指定的时间内没有得到结果,就会抛出TimeoutException异常。

try {
    String join = (String) anyFuture.get(5, TimeUnit.SECONDS);
    // 处理获取到的结果
} catch (TimeoutException e) {
    // 处理超时异常
    System.out.println("获取结果超时");
    // 其他操作
}

在这里插入图片描述