多线程初阶(九):线程池 & ThreadPoolExecutor & 工厂模式

时间:2024-10-22 16:45:45

目录

1. 线程池

1.1 概念

1.2 作用

2. ThreadPoolExecutor

3. ThreadPoolExecutor 构造方法参数的含义 [面试题]

3.1 int corePoolSize

3.2 int maximumPoolSize

3.3 long keepAliveTime

3.4 TimeUnit unit

3.5 BlockingQueue workQueue

3.6 ThreadFactory threadFactory

3.6.1 工厂模式

3.7 RejectedExecutionHandler handler ★★★

3.7.1 ThreadPoolExecutor.AbortPolicy 

3.7.2 ThreadPoolExecutor.CallerRunsPolicy 

3.7.3 ThreadPoolExecutor.DiscardOldestPolicy

3.7.4 ThreadPoolExecutor.DiscardPolicy 

4. 工厂类Executors

5. 模拟实现线程池


1. 线程池

1.1 概念

什么是线程池呢?

简单来说, 线程池就是将我们提前创建好的线程, 放到"池子"(类似于数组)中, 需要用的时候, 随时去取, 用完了之后, 再放回池子中.

1.2 作用

线程池的作用就是, 让我们能够高效的创建和销毁线程.

大家还记得引入线程的原因吧, 就是因为: 进程是一个比较重的概念, 频繁的创建和销毁进程的开销的太大了, 为了减少开销提高效率, 引入了"轻量级进程" --- 线程.

但是, 随着互联网的发展, 我们对性能的要求也更进一步, 觉得频繁创建销毁线程的开销也很大, 于是就引入了以下两个概念 : 

  1. 线程池
  2. 协程(纤程, 轻量级线程)

由于 协程 是在JDK17时引入的, 目前的使用还没有普及, 这里仅讨论 线程池.

那为啥直接创建线程要比从线程池中取线程的开销更大呢?

要知道, 操作系统由两部分构成:

  1. 内核
  2. 配套的应用程序

内核是操作系统的核心部分, 也包含了操作系统的核心功能 : 1. 向下管理硬件设备 2. 向上给软件提供稳定的运行环境.

而一个操作系统, 只有一个内核 而这一个内核, 要给所有的应用程序提供服务支持.

  • 如果从操作系统创建新线程, 就需要操作系统内核配合完成. (内核要做的事很多, 程序员的代码也干预不了内核, 是不可控的)
  • 如果, 从线程池取现成的线程, 纯应用程序代码就可以完成, 不依赖操作系统. (可控的)
  • 可控的过程要比不可控的过程更加高效.

 所以, 使用线程池, 就可以省下应用程序切换到内核中运行的开销, 提高效率.


2. ThreadPoolExecutor

Java 标准库中也我们提供了线程池: ThreadPoolExecutor类

ThreadPoolExecutor 中准备了一些线程, 可以让这些线程来执行任务.

核心方法: submit(Runnable), 通过 Runnable 描述要执行的任务, 通过 submit 将任务放到 消息队列中, 线程池中的线程就会执行这些任务.

3. ThreadPoolExecutor 构造方法参数的含义 [面试题]

要使用 ThreadPoolExecutor , 肯定要了解它的构造方法, ThreadPoolExecutor 构造方法的参数是比较多的, 我们慢慢往下看.

我们按照第四个构造方法, 依次介绍其中的参数的含义.

 3.1 int corePoolSize

这个参数指的是 核心线程数.

即 线程池中至少的线程数, 线程池一创建, 这些线程也要随之创建, 整个线程池销毁, 这些线程才会销毁(数量是固定的), 是线程池中最核心的线程.

3.2 int maximumPoolSize

这个参数指的是 最大线程数.

核心线程和非核心的线程的总数. 上面说了核心线程的数量是不变的. 

但是非核心线程是自适应的, 在任务繁忙时就创建, 任务不繁忙时就会销毁.

举个例子, 

我们可以把线程池当成一个公司, 把核心线程当做正式员工, 把非核心线程当做实习生, 当公司业务忙的时候就招聘实习生, 等不需要的时候就裁掉实习生.

但是对于公司中的正式员工, 是不能乱裁的.

3.3 long keepAliveTime

这个参数指的是 允许非核心线程空闲的最大时间.

上文说到, 非核心的线程是自适应的, 在任务不繁忙时就会销毁, 而这个参数就是指定了非核心线程的最大空闲时间, 当超过这个时间非核心线程还没有任务可做时, 就会被销毁优化掉.

3.4 TimeUnit unit

这个参数可以指定 keepAliveTime 的时间单位.

TimeUnit是一个枚举类型.

3.5 BlockingQueue<Runnable> workQueue

这个参数就是我们指定的 任务队列.

线程池 本身就是一个生产者消费者模型, submit 就是在生产任务, 线程池中的线程就是在消费任务, 而 任务队列就是起到传递任务的作用.

我们就可以传参来自己指定任务队列, 给我们更大的*度(选择使用数组/链表, 指定 capacity ...), 根据任务来传队列.

3.6 ThreadFactory threadFactory

ThreadFactory 是一个Java 标准库 为线程提供的一个 工厂类, 我们就可以通过这个工厂类来统一的构造并初始化线程池中线程的属性, 简化代码. (虽然工厂模式是为了解决构造方法的缺陷, 但是这里是因为线程池中线程太多了, 以此方便线程池中线程属性的初始化)

什么是工厂类呢? 这里就需要为大家讲一下工厂模式.

3.6.1 工厂模式

工厂模式, 也是设计模式的一种(和单例模式是并列的关系), 是用来解决构造方法的缺陷的一种模式.

举个例子, 当我们描述平面上的一个点的位置时, 可以使用平面直角坐标系表示这个点, 也可以使用极坐标来表示.

当我们想分别用两种方式进行对这个点的构造时, 发现这两个构造方法是构不成重载的:

  

由于连个构造方法的参数列表(个数, 类型)相同, 所以无法构成重载, 那我们就可以通过工厂模式, 创建工程类, 通过工厂方法来完成这个点两种方式的构造.

工厂方法的核心, 就是通过静态方法, 把构造对象 new 的过程 和 各种属性初始化的过程, 封装起来. 而我们可以提供多组静态方法, 实现不同方式的构造.

这个静态的完成属性设置的方法就是工厂方法, 而包含工厂方法的类就是工厂类.


3.7 RejectedExecutionHandler handler ★★★

这个参数是线程池七个参数中最复杂最重要的一个参数. 

这个参数指的是 拒绝策略所对应的对象.

我们知道, submit 把任务添加到消息队列中, 而消息队列是一个阻塞队列, 而当队列满时在添加就会发生阻塞, 一旦发生阻塞, 这个线程就没法执行别的任务, 这不是一个好的现象.

所以在开发中, 我们是不希望程序阻塞的, 这时候我们就需要 拒绝策略.

举个例子 : 一个要相应用户请求的线程阻塞了, 用户就会等很久, 造成了直观上的 "卡顿" , 那这个时候, 应该直接告诉用户 "请求失败", 而不是让用户等这么久.

所以, 对线程池来说, 当队列满时, 再进行入队操作时, 不会真的进行"阻塞", 而是执行拒绝策略相关的代码.

而拒绝策略, 分为以下四种, 每一种是一个类(都是 ThreadPoolExecutor 的静态内部类), 传参时传的也是对应类的对象.

3.7.1 ThreadPoolExecutor.AbortPolicy 

当消息队列满时, 依然有线程 submit 任务时, AbortPolicy 的拒绝策略是直接抛出异常, 使整个线程池都无法继续工作(非但新的任务).

比如, 汤老湿一周上八节课, 课程已经排的很满了.

一天晚上, 鹏哥, 给老湿安排了个直播任务(鹏哥进行了 submit 操作)

老湿若采用 AbortPolicy 的拒绝策略.

那老湿就对鹏哥大哭起来, 说的自己太累了, 心态崩了, 不能上课也不能直播~.

于是 鹏哥就让老湿休息, 于是老湿课也不用上了, 直播任务也不用执行了.

(AbortPolicy 就是 非但新的任务不用做, 旧的任务也执行不了了)

3.7.2 ThreadPoolExecutor.CallerRunsPolicy 

当消息队列满时, 依然有线程 submit 任务时, CallerRunsPolicy 的拒绝策略是自己不会执行, 而让调用 submit 的线程执行任务.

如果老湿采用 CallerRunsPolicy 的拒绝策略,

那么汤老湿对鹏哥说: "鹏哥, 我课实在是多了呀, 抽不出时间呀, 还是你去直播吧~".

于是, 最终老湿没有执行, 而是给老湿安排任务的鹏哥执行了任务.

3.7.3 ThreadPoolExecutor.DiscardOldestPolicy

当消息队列满时, 依然有线程 submit 任务时DiscardOldestPolicy 的拒绝策略是丢弃队列中最老的任务, 执行这次最新 submit 的任务.

如果老湿采用 DiscardOldestPolicy 的拒绝策略,

那汤老湿就是将本班的学生给鸽了, 没有给学生上课, 而是去执行鹏哥安排的直播任务.

3.7.4 ThreadPoolExecutor.DiscardPolicy 

当消息队列满时, 依然有线程 submit 任务时, DiscardPolicy 的拒绝策略是丢弃最新的任务(即不执行当前 submit 的任务).

如果老湿采用 DiscardPolicy 的拒绝策略,

鹏哥就是说 : "你太忙了就不搞了, 以后有空再说"


注意:

上文所说的线程都是指一个进程中的线程.

由于进程间的内存空间是相互独立的, 所以两个进程间的线程无法直接进行交互.

进程1 创建的阻塞队列, 进程2 也是无法进行访问的.


4. 工厂类Executors

由于通过 ThreadPoolExecutor 创建线程池参数处理太繁琐 太麻烦, Java 标准库又基于工厂模式将 ThreadPoolExecutor 进行了封装, 提供了一个工厂类 Executors, 来简化线程池的使用.

可以使用以下两个静态方法构建线程池: 

  • newFixedThreadPool // 核心线程数和最大线程数一样
  • newCachedThreadPool // 最大线程数可以无限增加

 

public static void main(String[] args) {
        // ExecutorService threadPool = Executors.newCachedThreadPool();
        ExecutorService threadPool = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 100; i++) {
            int id = i;
            threadPool.submit(() -> {
                System.out.println("hello " + id + " by " + Thread.currentThread().getName());
            });
        }
        // shutdown 能够把线程池里的线程全部关闭,但是不能保证线程池内的任务一定能全部执行完毕。
        // threadPool.shutdown();
        // 所以,如果需要等待线程池内的任务全部执行完毕,需要调用 awaitTermination 方法。
        // threadPool.awaitTermination()
    }

线程池中的线程执行完队列中现有的任务后, 进程是不会结束的. 队列为空, 线程会在 take 处继续阻塞等待新的任务被 submit. 因为线程是前台线程, 所以导致进程也不会结束.

要是想结束进程, 可以选择调用以下两种方法:

  1. shutdown // 把线程池里的线程全部关闭,但是不能保证线程池内的任务一定能全部执行完毕
  2. awaitTermination // 等待线程池内的任务全部执行完毕, 结束进程

5. 模拟实现线程池

要实现线程池, 那我们必须要有一个能够存储任务的任务队列.

线程池中还要有线程, 当队列中有任务被 submit 到队列后, 线程池里的线程会立即执行这些任务.

若队列中没有任务, 则线程池中的线程会阻塞等待, 直到新的任务到来.

class MyThreadPool {
    BlockingQueue<Runnable> queue = null;

    public MyThreadPool(int n) {
        // 任务队列
        queue = new LinkedBlockingQueue<>(1000);
        // 创建 n 个线程
        for (int i = 0; i < n; i++) {
            Thread t = new Thread(() -> {
                try {
                    while (true) {
                        Runnable task = queue.take();
                        task.run();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            t.start();
        }
    }
    public void submit(Runnable task) throws InterruptedException {
        queue.put(task);
    }
}

public class Demo29 {
    public static void main(String[] args) throws InterruptedException {
        MyThreadPool myThreadPool = new MyThreadPool(10);
        for (int i = 0; i < 100; i++) {
            int id = i;
            myThreadPool.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " ID = " + id);
            });
        }
    }
}

注意: 当 submit 的任务被线程池中的线程全部执行完后, 进程不会结束, 因为任务队列为空, 线程会在 take 处发生阻塞, 等待新的任务被 submit .(我们手动创建的线程为前台线程, 会阻止进程的结束)

(手动结束进程, 退出码为非0值, 代表进程非正常结束)


END