(五) Java多线程详解之Callable和Future阻塞线程同步返回结果

时间:2021-10-13 14:38:19

概念

Callable接口是JDK5后新增的接口,它提供了一个call()方法可以作为线程的执行体。与Runnable接口的run()方法相比它更为强大,主要体现在call()方法可以有返回值,并且call()方法可以声明抛出异常。需要注意的是Callable不是Runnable的子接口,所以Callable对象不能直接作为Thread的target。Future可以用于获取call()方法计算的结果 判断任务是否完成 中断任务等

FutureTask实现了Runnable和Future接口,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值,那么这个组合的使用有什么好处呢?假设有一个很耗时的返回值需要计算,并且这个返回值不是立刻需要的话,那么就可以使用这个组合,用另一个线程去计算返回值,而当前线程在使用这个返回值之前可以做其它的操作,等到需要这个返回值时,再通过Future得到

使用Callable和Future

public class ThreadExample12 {
    public static void main(String[] args) {
        FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return "Hello World";
            }
        });

        new Thread(future, "有返回值的线程").start();

        try {
            System.out.println("子线程的返回值:" + future.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行代码输出”子线程的返回值:Hello World”,通过future.get()方法阻塞等待线程返回结果

下面来看另一种方式使用Callable和Future,通过ExecutorService的submit方法执行Callable,并返回Future

public class ThreadExample12 {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        @Override
        Future<String> future = threadPool.submit(new Callable<String>() {
            public String call() throws Exception {
                Thread.sleep(3000);
                return "Hello World";
            };
        });

        try {
            System.out.println("子线程的返回值:" + future.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Future取得的结果类型和Callable返回的结果类型必须一致,这是通过泛型来实现的。返回的Future对象可以取消任务,通过调用Future的future.cancel(true)方法实现,Future对象还可以对超时线程进行关闭,通过future.wait(3)如果线程耗时超过3秒则抛出异常

在这里有人可能会有疑问,既然要同步返回结果那我为什么不直接在主线程执行要再新建一个子线程呢?这是因为在新建的子线程在执行过程中,主线程还可以执行它的复杂业务逻辑(可查阅Future模式相关资料)。并且这只是局限于在一个线程时,如果需要同时执行多个线程等待多个线程返回结果时,Callable和Futurede的优势会更明显,可以看一下实例代码,就能明白了:

public class ThreadExample13 {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);
        for (int i = 1; i <= 10; i++) {
            final int seq = i;
            completionService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(new Random().nextInt(5000));
                    return seq;
                }
            });
        }

        for (int i = 0; i < 10; i++) {
            try {
                System.out.println(completionService.take().get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

这个例子是异步执行10个线程同步返回执行结果,如果项目中有用到该功能的同学,建议看看纤程Quasar Fiber,Quasar Fiber功能与之类似但更为强大