Java线程池ThreadPoolExector的源码分析

时间:2022-05-13 21:07:08

前言:线程是我们在学习java过程中非常重要的也是绕不开的一个知识点,它的重要程度可以说是java的核心之一,线程具有不可轻视的作用,对于我们提高程序的运行效率、压榨CPU处理能力、多条线路同时运行等都是强有力的杀手锏工具。线程是如此的重要,那么我们来思考这样一个问题。假设我们有一个高并发,多线程的项目,多条线程在运行的时候,来一个任务我们new一个线程,任务结束了,再把它销毁结束,这样看似没有问题,适合于低并发的场景,可是当我们的项目投入到生产环境,一下涌入千条任务的时候,线程不断的new执行,JVM不断的回收,这样重复这个过程,并且任务过多,一下子new不过来,任务就会有遗漏,可能发生这样的情况,任务执行了但是却没有返回结果,并且很容易发生宕机。这是我们绝对不愿意看到的场景。所以,我们为什么不思考一下,然后用一个管理池把所有的线程管理起来呢?那么线程池就应运而生了。本期的博客就来聚焦线程池的源码,走进线程池的内部,看看它究竟是如何工作的,使用线程池到底能带给我们什么好处?我们为什么要使用它?

本篇博客的目录:

一:线程池的介绍

二:使用线程池的好处

三:线程池的分类

四:ThreadPooolExector类的实现源码分析

五:关于线程池的总结

一:线程池的介绍

1: 前言中我们已经说明了单线程创建处理任务的时候的场景的弊端,线程池的出现,就是专门为了解决这一问题应运而生的,它解决了由于线程创建太过频繁而发生的性能损耗,采用了线程处理完任务而继续保持留在线程池内的机制,而勉去重新创建新线程的这一特别耗费时间和资源的开销。线程池的定义如下:线程池是指管理一组工作线程的资源池,线程池是与工作队列密切相关的,其中在工作队列中宏保存了所有等待执行的任务,从工作队列中获取一个任务,执行任务,然后在线程池中并等待一下个任务。

2:线程池的特点:线程池是专门用来管理线程的,首先呢,它本身创建出来的时候,里面是没有线程的。只有当有任务进来的时候,它才会去创建线程去处理任务线程,这时候线程池的大小就处于一个增长的过程。在这里注意,有两个线程的概念,一个是工作线程,一个是任务线程。当然我们建立一个池,不能让它无限制的增长,因此它具有以下几个属性:coreSize、maxiumSize、BlockingQueue,其中coreSize我们暂且先翻译为核心池大小,它是一个int的数值类型,表示的是创建线程的个数,这是一个临界值,当线程池中的数量超过coreSize个大小的时候,它就会将任务放在阻塞队列中,等到线程中的数目降到coreSize大小之下,然后它会从队列中取,继续执行任务。当然也会有任务数量比较多的情况,这时候就会创建maxiumSize个线程数了,注意:maxiumSize是它的最大值,超过这个数,线程池就要对其进行采取拒绝策略了。

我们用图来表示一下这个过程:

Java线程池ThreadPoolExector的源码分析

二:使用线程池的好处

我总结了一下,主要有以下几点:

1:线程统一管理,线程池具有创建线程和销毁线程的能力,线程集中在一起比起分散开来,更加便于管理

2:重用现有的线程而不是创建新线程,可以在处理多个请求的时候分摊线程创建和销毁过程中产生的巨大开销

3:当请求到达的时候,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性

4:可以创建足够多的线程便于处理器保持忙碌状态,而对这些足够多多线程数量又进行了限制,会防止其溢出,耗尽处理器内存

5:可以防止多线程相互争夺资源而使应用程序而产生的并发问题

三:线程池的分类

从jdk1.5开始中,api就提供了灵活的线程池以及一些配置,我们可以调用Executors中的静态工厂方法来创建一个线程池,Executors是一个接口,它的子类是ExecutorService,然后有一个具体的实现类:ThreadPoolExector.关于线程池,它有以下几种不同的分类:

1:newFixedThreadPool

Fixed这个词的意思是固定不变的,newFixedThreadPool将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的数量,这时线程池的大小将不会变化。如果线程在执行的过程中发生异常,这个时候线程池就会补充一个线程

2: newCachedThreadPool

  newCachedThreadPool将会创建一个可缓存的线程池,如果线程池的当前数量超过了线程池,那么线程池将回收空闲的线程,如果任务很多的时候,它将会继续创建线程而不会存在任何的限制。

3: newSingleThreadExector

newSingleThreadExector将只会创建一个线程来执行任务,这点有点类似于node.js,采用单线程的机制,如果这个线程异常结束,那么会再重新创建一个线程,它始终保证在线程池里的只有一个线程,这样就可以保证任务在队列总是按照预定的顺序执行。

4: newScheduledThreadPool

newScheduledThreadPool创建一个固定长度的线程池,而且是以延迟或者定时的方式来执行任务,这点有点类似于Timer类

四:ThreadPooolExector类的实现源码分析

1:为什么要讲ThreadPoolExector类?

Exector是ThreadPoolExector的祖父类接口,ThreadPoolExector的直接父类接口是ExectorService,而我们所讲的第三点,其中的不同线程池的分类其实都是Exector中的方法,而在ThreadPoollExector中得到了实现,所以我们要构建的不同种类的线程池主要还是依赖这个类完成,接下来我们就聚焦ThreadPoolExector来看其具体的实现方法

2:ThreadPoolExector的源码讲解

2.1:同样的,它是一个类,首先我们来看它有哪些全局变量

   private final BlockingQueue<Runnable> workQueue;//存放线程的队列。这里是一个阻塞队列

    private final ReentrantLock mainLock = new ReentrantLock();//维持一个可重入锁

    private final Condition termination = mainLock.newCondition();//获取锁的状态

    private final HashSet<Worker> workers = new HashSet<Worker>();//存放Worker类的集合

    private volatile long  keepAliveTime;//保持存活的时间

    private volatile boolean allowCoreThreadTimeOut;//是否允许核心线程超时

    private volatile int   corePoolSize;//核心池大小

    private volatile int   maximumPoolSize;//最大线程池的大小

    private volatile int   poolSize;//池的大小

    private volatile RejectedExecutionHandler handler;//注入执行的处理器

    private volatile ThreadFactory threadFactory;//线程工厂,主要是用来生产线程

    private int largestPoolSize;//最大线程大小

    private long completedTaskCount;//已执行完成的任务数量

    private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy(); //拒绝策略类

从中,我们也可以看出我们在线程池介绍中谈到的关于coreSize和maxiumSize等参数,这些int值对线程池的中的线程池数量进行了限制,还有一些关于锁ReentrantLock

的类,这是一个可重入锁,它的主要目的是锁住其操作,因为线程的操作要保证其原子性,防止冲突发生,所以在其源码中很多都对其进行了上锁操作。还有一个很重要的值的全局的变量state:

volatile int runState;//运行状态
static final int RUNNING = 0;//0表示正在运行中
static final int SHUTDOWN = 1;//1表示关闭
static final int STOP = 2;//2表示停止
static final int TERMINATED = 3;//3表示结束

这些状态值是线程池目前所处环境的状态的体现,它采用int数字来表现,记住这些值很重要,因为后面有很多方法调用线程池的运行状态,有很多对其值进行判断。采用volatile修饰,也保证其在线程池中是随时保证对其他线程的可见,防止并发过程中未知异常的发生。

2.2:ThreadPoolExector的构造函数

     public ThreadPoolExecutor(int corePoolSize,                  //核心池大小
int maximumPoolSize, //池的最大大小
long keepAliveTime, //保持活动的时间
TimeUnit unit, //时间单位
BlockingQueue<Runnable> workQueue) { //阻塞队列
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
} public ThreadPoolExecutor(int corePoolSize,//核心池大小
int maximumPoolSize,//池的最大大小
long keepAliveTime,//保持存活的时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory) {//线程工厂
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,//调用其他构造函数
threadFactory, defaultHandler);
} public ThreadPoolExecutor(int corePoolSize,//核心池的大小
int maximumPoolSize,//最大池的大小
long keepAliveTime,//保持唤醒的时间
TimeUnit unit,//时间单元
BlockingQueue<Runnable> workQueue,//阻塞队列
RejectedExecutionHandler handler) {//注入的执行处理器
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,//调用其他函数
Executors.defaultThreadFactory(), handler);
} public ThreadPoolExecutor(int corePoolSize,//核心池的大小
int maximumPoolSize,//最大池大小
long keepAliveTime,//保持存活的时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler) {//注入的执行处理器
if (corePoolSize < 0 || //如果核心池的大小小于0
maximumPoolSize <= 0 || //最大池大小小于0
maximumPoolSize < corePoolSize || //最大池大小不小于核心池大小
keepAliveTime < 0) //保持唤醒的时间小于0
throw new IllegalArgumentException();//抛出主题违法异常
if (workQueue == null || threadFactory == null || handler == null)//如果队列为null或者线程工厂为null或者处理器为null
throw new NullPointerException();//抛出空指针异常
this.corePoolSize = corePoolSize;//构造核心池大小
this.maximumPoolSize = maximumPoolSize;//最大池的大小
this.workQueue = workQueue;//工作队列
this.keepAliveTime = unit.toNanos(keepAliveTime);//以保持的存活时间构造存活时间
this.threadFactory = threadFactory;//构造线程工厂
this.handler = handler;//处理器
}

可以看出ThreadPoolExector一共有四个构造函数,但是最后调用的都是最后一个,我们可以只看最后一个,它主要有核心池大小、最大池大小、存活时间、时间单位、阻塞队列、线程工厂这几个参数,其中又对其进行了值范围的检查,如果参数违法就抛出异常,然后构造进去。关于这几个参数,随着后面我们对其方法的讲解,会理解越来越深刻的。

2.3:ThreadPool的主要方法讲解

2.3.1:我们先来看一下最重要的方法,执行任务的方法:

    public void execute(Runnable command) {//执行方法
if (command == null)//如果任务线程为null
throw new NullPointerException();//抛出空指针异常
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//如果线程池的大小大于核心池的大小或者线程池没有保证在coreSize下
if (runState == RUNNING && workQueue.offer(command)) {//如果运行状态是正在运行并且阻塞队列中把线程任务放进去
if (runState != RUNNING || poolSize == 0)//如果运行状态不是正在运行或者池的大小是0
ensureQueuedTaskHandled(command);//确保队列中的任务被处理
}
else if (!addIfUnderMaximumPoolSize(command))//没有在保证最大的线程池大小下添加线程任务
reject(command); // 调用注入方法
}
}

这是执行任务的方法,其中可以看出当任务线程为null的时候,就抛出异常,然后如果池的大小大于核心池的大小,也就是说此时它要将任务放在阻塞队列中,这个时候它采用的是BlockingQueue的offer方法,将任务放进去,同时保证状态是正在运行。因为线程池的状态随时可能被修改,所以处处得判断。添加进去以后,如果状态不是在运行中,或者池里没有线程,就调用ensureQueueTaskHandled处理任务,我们来看看这个方法的源代码:

    private void ensureQueuedTaskHandled(Runnable command) {//确保队列中的任务被处理
final ReentrantLock mainLock = this.mainLock;//主锁
mainLock.lock();//上锁
boolean reject = false;//设置reject为false
Thread t = null;//声明一个线程t
try {
int state = runState;//获取运行状态
if (state != RUNNING && workQueue.remove(command))//如果状态不是在运行中,在队列中移除线程
reject = true;//注入设置为true
else if (state < STOP &&//如果状态小于停止(这时候int的值作用就发挥了,看一下小于stop的有几?stop是2,小于它也就是0和1,就表示运行和结束,上面的if说明了不是0,那么这里就表示1,也就是结束) 并且 如果池的大小大于1并且队列不是空,也就是还存在任务在队列中没有执行完
poolSize < Math.max(corePoolSize, 1) &&
!workQueue.isEmpty())
t = addThread(null);//添加一个线程,设置参数为null
} finally {
mainLock.unlock();//最后解锁
}
if (reject)//如果注入为ture
reject(command);//注入线程
else if (t != null)//如果线程不为null
t.start();//线程开始运行
}

其中可以看到如果线程池的状态是关闭的时候,这时候队列也会移除任务,然后调用addThread方法,并且参数为null:

    private Thread addThread(Runnable firstTask) {//添加线程
Worker w = new Worker(firstTask);//工作类新构造一个线程
Thread t = threadFactory.newThread(w);//用线程工厂生产一个新类
if (t != null) {//如果生产出来的类不是null
w.thread = t;//设置一个线程t
workers.add(w);//工作线程添加
int nt = ++poolSize;//池的大小增加
if (nt > largestPoolSize)//如果线程池的大小大于最大池的大小
largestPoolSize = nt;//把增加后的池的大小赋给最大池大小
}
return t;//返回该线程
}

我们来到addThread方法,此时参数为null,Worker类此时会构建一个null,我们再来看Worker的构造方法,然后看threadFactory,也就是线程工厂的方法

Worker(Runnable firstTask) {//构造一个任务线程
this.firstTask = firstTask;
}

Java线程池ThreadPoolExector的源码分析

从中可以看出,ThreadFactory只是进行了一个对创建线程的封装,如果传入的参数为null,那么它也会返回null,那么addThread就会返回null,我们再回去看看,也就是说如果其线程池状态为停止,那么阻塞队列会移除其中的任务,并不再执行任务线程。好了,我们再回过头,看看按照正常逻辑怎么走。看一下addIfUnderCorePoolSize的源码:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {//保证在核心池的大小下添加任务
Thread t = null;//线程为null
final ReentrantLock mainLock = this.mainLock;//主锁
mainLock.lock();//开始上锁
try {
if (poolSize < corePoolSize && runState == RUNNING)//如果池的大小小于核心池大小并且是正在运行状态
t = addThread(firstTask);//把第一个任务添加进去
} finally {
mainLock.unlock();//最后解锁
}
if (t == null)//如果线程是null的话
return false;//返回false
t.start();//线程开始执行
return true;//返回true
}

这个方法主要是说如果线程数还未超过coreSize的时候,此时就会通过线程工厂生产出一个线程然后执行任务,这是绝大多数的线程的状态。那么此时就还剩一种状态,也就是在maxiumSize下,这个时候,会调用addIfUnderMaximumPoolSize方法,我们来分析一下这个方法的源码:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {//添加任务在未超过池的最大容量时
Thread t = null;//声明一个线程
final ReentrantLock mainLock = this.mainLock;//开启一个主锁
mainLock.lock();//开始上锁
try {
if (poolSize < maximumPoolSize && runState == RUNNING)//如果池的大小小于最大池的大小并且运行状态是在运行中
t = addThread(firstTask);//调用添加线程的方法添加线程
} finally {
mainLock.unlock();//最后一定解锁
}
if (t == null)//如果线程为null
return false;//返回false
t.start();//线程开始运行
return true;//返回true
}

这里也是对其池的大小进行了判断,如果其符合小于最大池大小并且线程池状态是正在运行,那么就生产线程去执行任务。

总结:execute方法主要是对线程数在不同的大小的判断,并且是时刻判断线程池的运行状太,如果是正在运行,那么就会生产线程去执行任务,一旦超过coresize就放在阻塞队列中去,未超过maxiumSize也会继续执行,如果线程池状态是停止的话,那么此时阻塞队列会移除任务,并且不再创建线程去执行任务。

2.3.2:ThreadPoolExector中的内部类Worker的讲解

Worker作为ThreadPoolExector中的一个内部类,它也继承了Runnable接口,所以其本质上是一个线程,按照我们分析源码的技巧,先来看看它的全局变量:

       private final ReentrantLock runLock = new ReentrantLock();//新建一个可重入锁

        private Runnable firstTask;//第一个任务线程

        volatile long completedTasks;//已经完成的任务数

        Thread thread;//声明一个线程

从中可以看出它的任务,而Woker其实也就是线程池中运行的工作线程,这个类中主要是对线程的打断和对线程池的关闭等一些列的方法操作,这里我们拿出两个方法来讲,一个是shutdown方法,顾名思义也就是关闭线程池,我们来看一下具体的代码:

public void shutdown() {   //关闭方法

    SecurityManager security = System.getSecurityManager();//获取系统安全管理器
if (security != null)//如果安全接口不为null
security.checkPermission(shutdownPerm);//检查权限 final ReentrantLock mainLock = this.mainLock;//获取可重入锁
mainLock.lock();//开始上锁
try {
if (security != null) { //如果安全接口不为null
for (Worker w : workers)
security.checkAccess(w.thread);
} int state = runState;
if (state < SHUTDOWN)//如果状态小于关闭,那么肯定是running或者介于running和关闭之间
runState = SHUTDOWN;//状态设置为关闭 try {
for (Worker w : workers) {//遍历整个工作线程
w.interruptIfIdle();//如果是空闲的就打断
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
} tryTerminate(); // Terminate now if pool and queue empty
} finally {
mainLock.unlock();
}
}

可以看出结束方法首先调用的是系统安全接口,这主要的目的是为了在系统层面对线程池进行保护,防止其发生意外。比如中断系统进程等,获取了安全管理器之后接下来再对其进行权限检查,然后就是加锁控制。接下来就是取状态进行判断了,接着遍历循环整个线程池里的工作线程,一旦发现空闲的就进行打断终止。然后是调用tryTerminate方法,我们来看看其中的源码:

    private void tryTerminate() { //结束
if (poolSize == 0) {
int state = runState; //取当前运行状态
if (state < STOP && !workQueue.isEmpty()) {//如果运行状态小于stop,也就是有运行和关闭的可能并且队列不为空
state = RUNNING; //运行状态设置为Running
Thread t = addThread(null);//不创造线程
if (t != null)//
t.start();
}
if (state == STOP || state == SHUTDOWN) {
runState = TERMINATED;
termination.signalAll();
terminated();
}
}
}

如果线程池里没有线程了,就取当前的运行状态,然后再阻塞队列中还有线程的情况下,可以看到不创建线程了,此时传入addThread的参数为null,我们来具体看一下:

private Thread addThread(Runnable firstTask) {//添加线程
Worker w = new Worker(firstTask);//工作类新构造一个线程
Thread t = threadFactory.newThread(w);//用线程工厂生产一个新类
if (t != null) {//如果生产出来的类不是null
w.thread = t;//设置一个线程t
workers.add(w);//工作线程添加
int nt = ++poolSize;//池的大小增加
if (nt > largestPoolSize)//如果线程池的大小大于最大池的大小
largestPoolSize = nt;//把增加后的池的大小赋给最大池大小
}
return t;//返回该线程
}

如果参数为null,那会就会返回一个null的线程,可以看出这里将不会产生新的线程,队列中的任务有不会进行处理。然后是通过Conditon.signal方法唤醒所有等待线程,接下来再调用terminated()方法。

 protected void terminated() { }

目前的jdk1.6这个方法是空的,也就是不执行任何操作,那么这个方法就结束了。总结来说就是线程关闭的时候,检查队列中是否还有任务,如果有任务,不再继续创建线程,用原来的线程把其执行完后关闭线程池,状态设置为Terminted,如果没有任务,检查线程池是不是空,如果不是空,把其空间线程全部中断,等待工作的线程处理完成以后关闭整个线程池!

五:总结

本篇博客主要分析了线程池的实现原理,在线程池中很重要和关键的一部分就是对部分参数的理解,还有它的运行原理,作为设计到线程部分的知识,可以看出线程池的每步操作基本上都有一个ReentrantLock可重入锁,这是一种保护机制,它类似于synchronized,但是要比它更强大,并且是建立在java类的基础上,实现更灵活,比如它可以实现中断等待锁、性能更加优良等特性,还有对其状态的控制和操作,主要是通过Worker这个内部类来实现的,具体的可以参考jdk代码,我这里只是起到一个抛砖引玉的作用,本篇博客暂时写到这里。