【java并发编程实战】第六章:线程池

时间:2021-12-23 07:24:31

1.线程池

众所周知创建大量线程时代价是非常大的:
- 线程的生命周期开销非常大:创建需要时间,导致延迟处理请求,jvm需要分配空间。
- 资源消耗:线程需要占用空间,如果线程数大于可用的处理器数量,那么线程就会闲置,这给Gc造成压力。线程在竞争cpu的时候也会造成性能开销,所以线程不是越多越好,使用不当并不会增加运行效率。
- 稳定性:使用不当会造成内存溢出。

继而就引申出线程池

1.1线程池的创建

Executors框架:静态方法创建

  • newCachedThreadPool:创建可变线程数的线程池。有空闲的时候会回收。
  • newFixedThreadPool:创建固定线程数的线程池。
  • newScheduledThreadPool:创建固定数量线程池,以延时、定时的方式执行。
  • newSingleThreadExecutor:创建单线程线程池。

Executors框架:

生命周期:运行->关闭->已终止
> Executor扩展了ExecutorSerivice接口。提供了平缓关闭接口的方法。调用shutdown方法后会拒绝新增任务,进入关闭状态。等待所有任务处理完进入已终止。showdownNow会立刻进入已终止。调用awaitTermination会立刻调用shutdown等待关闭后返回同步关闭状态。
【java并发编程实战】第六章:线程池

2.callable和future

future异步返回线程计算结果。callable可以以泛型的形式显示定义返回异步返回类型。
下面介绍一个例子:

public class MyCallable implements Callable<String> {

    @Override
    public String call() throws Exception {
        Thread.sleep(1000L);
        //return the thread name executing this callable task
        return Thread.currentThread().getName();
    }

    public static void main(String args[]){
        //Get ExecutorService from Executors utility class, thread pool size is 10
        ExecutorService executor = Executors.newFixedThreadPool(10);
        //create a list to hold the Future object associated with Callable
        Queue<Future<String>> list = new LinkedBlockingQueue<>();
        //Create MyCallable instance
        Callable<String> callable = new MyCallable();
        for(int i=0; i< 100; i++){
            //submit Callable tasks to be executed by thread pool
            Future<String> future = executor.submit(callable);
            //add Future to the list, we can get return value using Future
            list.add(future);
        }
        while (true) {
            Future fut = list.poll();
            try {
                if (fut.isDone()) {
                    //print the return value of Future, notice the output delay in console
                    // because Future.get() waits for task to get completed
                    System.out.println(new Date()+ "::"+fut.get());
                } else {
                    list.add(fut);
                }
            } catch (InterruptedException | ExecutionException | CancellationException e ) {
                e.printStackTrace();
            }
        }
    }

}

其中,当任务执行完成,调用get会返回Exception。如果么有完成,那么进入阻塞状态直至返回。如果执行过程中抛出异常,Executors会封装成ExecutionException重新抛出,通过getCause获取原始异常。如果任务被取消,就会抛出CancellationException。

3.completionService

轮询的方式太低效,可以考虑completionService

public class MyCallable implements Callable<String> {

    @Override
    public String call() throws Exception {
        Thread.sleep(1000L);
        return Thread.currentThread().getName();
    }

    public static void main(String args[]){
        //

        ExecutorService executor = Executors.newFixedThreadPool(10);

        ExecutorCompletionService<String> stringExecutorCompletionService = new ExecutorCompletionService<>(executor);


        Queue<Future<String>> list = new LinkedBlockingQueue<>();
        Callable<String> callable = new MyCallable();
        for(int i=0; i< 100; i++){
            Future<String> future = stringExecutorCompletionService.submit(callable);
            list.add(future);
        }
        while (true) {
            try {
                Future<String> take = stringExecutorCompletionService.take();
                String s = take.get();
                System.out.println(s);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

}

// 待补充分析源码。

4.为线程的返回值设置时限。

Future.class
【java并发编程实战】第六章:线程池
如果get超过等待时间讲抛出timeoutexception