Java高并发 -- J.U.C.组件扩展

时间:2021-07-10 16:36:37

Java高并发 -- J.U.C.组件扩展

主要是学习慕课网实战视频《Java并发编程入门与高并发面试》的笔记

FutureTask

Future模式,核心思想是异步调用。和同步调用的区别在于:如果某个任务A非常耗时,异步调用下,被调者可以立即返回,然后着手处理其他任务,不用在这个任务A上等待。等到真正需要任务A的结果了再尝试去获取。

Future模式,有点类似淘宝购物,假如买一部手机,从付款到收到货可能需要三天,这三天不需要傻傻等待,因为付款后会有一个订单,而这个订单是我买了这件东西、可以凭这个单号取东西的一个凭证。所以你一旦得到了订单,完全可以放下这件事取忙别的,到时候去拿快递就行了。Future模式的异步调用中,立即返回的一个值就类似于这里说的订单了(而不是你买的手机),然后就可以着手处理其他任务了,这就充分利用了等待时间。等其他任务处理完了,再根据拿到的这个类似订单的值,取到真实的数据(手机)。

FutureTask实现了RunnaleFuture接口,而RunnaleFuture实现了Runnale和Future接口。下面先以Future + Callable为例

package com.shy.concurrency.aqs;

import java.util.concurrent.*;

/**
 * @author Haiyu
 * @date 2019/1/4 10:28
 */
public class FutureTaskExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<String> callable = () -> {
            System.out.println("执行任务1");
            Thread.sleep(2000);
            return "Done!";
        };
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        long start = System.currentTimeMillis();
        Future<String> future = executorService.submit(callable);
        System.out.println("执行任务2");
        // get方法会阻塞,直到计算完成才返回
        Thread.sleep(2000);
        String result = future.get();
        long end = System.currentTimeMillis();
        System.out.println(result);
        System.out.println((end - start) / 1000);
        executorService.shutdown();
    }
}

Callable看成任务1,主线程中的任务看成任务2。可以看到Callable任务需要执行2秒,主线程中也需要耗时2秒。在submit后立即返回得到Future对象,让其执行同时主线程也可以执行自己的任务。两个任务同时执行,所以只花费了2s就执行完了两个任务。

下面再使用FutureTask,只需要稍微改动上面的程序即可。

package com.shy.concurrency.aqs;

import java.util.concurrent.*;

/**
 * @author Haiyu
 * @date 2019/1/4 10:28
 */
public class FutureTaskExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<String> callable = () -> {
            System.out.println("执行任务1");
            Thread.sleep(2000);
            return "Done!";
        };

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        long start = System.currentTimeMillis();
        FutureTask<String> future = new FutureTask<>(callable);
        executorService.submit(future);
        System.out.println("执行任务2");
        // get方法会阻塞,直到计算完成才返回
        Thread.sleep(2000);
        String result = future.get();
        long end = System.currentTimeMillis();
        System.out.println(result);
        System.out.println((end - start) / 1000);
        executorService.shutdown();
    }
}

Fork/Join

fork也就是分支、分叉的意思,可以将大任务分解成小任务;join表示等待的意思,必须等待fork后的小任务执行完毕,得到执行后的部分结果,才能将部分结果合并成最终结果。

比如计算1到100的和,规定当区间小于10时可以直接计算,否则就分成左右两个分支,如此递归地将[1, 100]区间划分成若干个小区间;每个小区间计算部分和,等待这些部分和的结果都计算完毕,最后将其全部合并,得到最终的结果。

通常一个物理线程需要处理多个逻辑任务,所以每一个线程都有一个任务队列。若线程A的任务都执行完了,B还有很多任务没执行,此时A会“帮助”B执行它的任务,A帮助B执行B的任务时,从队列的尾部拿数据;而B自己执行任务时从队列头部拿数据,这就像是两个指针一个往左移动一个往右移动,避免了A、B之间对数据的竞争。

JDK中有ForkJoinPool,该接口有个方法public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

ForkJoinTask支持fork()join()方法,它有两个重要的子类,没有返回值的RecursiveAction和有返回值的RecursiveTask,它们都有个方法compute(),在这个方法中进行主要的计算。对于RecursiveAction来说签名是void,而对于RecursiveTask来说有返回值所以签名是<T>

下面以计算1~100的和为例

package com.shy.concurrency.aqs;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * @author Haiyu
 * @date 2019/1/4 10:49
 */
public class ForkJoinExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(new CountTask(1, 100));
        int res = forkJoinTask.get();
        System.out.println(res);
    }
}

class CountTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 10;
    private int start;
    private int end;

    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        boolean canCompute = end - start < THRESHOLD;
        if (canCompute)  {
            int sum = 0;
            for (int i = start; i <= end ; i++) {
                sum += i;
            }
            return sum;
        } else {
            int mid = (end - start) / 2 + start;
            CountTask left = new CountTask(start, mid);
            CountTask right = new CountTask(mid + 1, end);
            left.fork();
            right.fork();
            int leftRes = left.join();
            int rightRes = right.join();
            return leftRes + rightRes;
        }
    }
}