Java 线程池 Executor 框架

时间:2022-05-05 10:00:28

在Java中,可以通过new Thread 的方法来创建一个新的线程执行任务,但是线程的创建是非常耗时的,而且创建出来的新的线程都各自运行、缺乏统一的管理,这样的后果是可能导致创建过多的线程从而过度消耗系统的资源,最终导致性能急剧下降,线程池的引入就是为了解决这些问题。

所谓线程池就是将多个线程放在一个池子里面(所谓的池化技术),然后需要线程的时候不是创建一个线程,而是从线程池里面获取一个可用的线程,然后执行我们的任务。线程池的关键在于它为我们管理了多个线程,我们不需要关心如何创建线程,我们只需要关心我们的额核心业务,然后需要线程来执行任务的时候从线程池中获取线程。任务执行完后线程就不会被撤销,而是会被重新放到池子里面,等待机会去执行任务。

当使用进程池控制进程的数量时,其他进程排队等候,当一个任务执行完毕后,再从队列中取最前面的任务开始执行。如果队列中没有等待进程,那么线程池中的这一资源会处于等待状态。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了,否则,进入等待队列。

线程池的优势:

① 线程池提供了一种简易的多线程编程方案,我们不需要投入过多精力去管理多个线程,线程池会帮我们管理好,它知道什么时候做什么事情,我们只要在需要的时候去获取就可以了。

② 线程池可以防止因创建过多的线程而导致的性能急剧下降。创建和销毁线程的代价是非常昂贵的,甚至我们创建和销毁线程的时间比实际执行任务的时间还要长,这显然是不合理的,线程池中的线程可以重复使用,减少了创建和销毁线程的次数。

③ 通过线程池限制线程数量,也可以减少线程过度切换的尴尬境地

Executor 框架

Executor 框架是一种将线程的创建和执行分离的机制。该框架基于 Executor 接口和 ExecutorService 接口,以及这两个接口的实现类 ThreadPoolExecutor 展开。Executor 有一个内部线程池,并提供了将任务传递到池中线程获得执行的方法,可传递的任务有如下两种:通过 Runnable 接口实现的任务、通过 Callable 接口实现的任务。在这两种情况下,只需要传递任务到执行器,执行器便可使用线程池中的线程或者新创建的线程来执行。执行器也决定了任务执行的时间。

Java 的线程既是工作单元,又是执行机制。工作单元包括 Runnable 和 Callable,而执行机制由 Executor 框架提供。

Executor 框架采用两级调度模型,在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。

Executor 框架的结构主要由 3 大部分组成:

1)任务。包括被执行任务需要实现的接口:Runnable 接口和 Callable 接口

2)任务的执行。包括任务执行机制的核心接口 Executor,以及继承自 Executor 的 ExecutorService 接口。Executor 框架有两个关键类实现了 ExecutorService 接口,即 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor

3)异步计算的结果。包括接口 Future 和实现 Future 接口的 FutureTask 类

Executor 框架的成员

ThreadPoolExecutor  // 通常使用工厂类 Executors 来创建,Executors 可以创建3种类型的 ThreadPoolExecutor:
FixedThreadPool
SingleThreadExecutor
CachedThreadPool ScheduledThreadPoolExecutor // 通常使用工厂类 Executors 来创建,Executors 可以创建2种类型的 ScheduledThreadPoolExecutor:
ScheduledThreadPoolExecutor
SingleThreadScheduledExecutor Future 接口 Runnable 接口和 Callable 接口

ThreadPoolExecutor 详解

ThreadPoolExecutor 类是 Executor 框架最核心的类

ScheduledThreadPoolExecutor 详解

该类继承自 ThreadPoolExecutor,主要用来在给定的延迟之后运行任务,或者定期执行任务。

FutureTask 详解

Future 接口和实现 Future 接口的 FutureTask 类,代表异步计算的结果。

FutureTask 除了实现 Future 接口外,还实现了 Runnable 接口。因此,FutureTask 可以交给 Executor 执行,也可以由调用线程直接执行,即调用 run() 方法。

线程池相关的接口和类都在 java.util.concurrent 包中。其类图关系如下:

Java 线程池 Executor 框架

Executor 接口是一个顶层接口,在它里面只声明了一个方法 executor(Runnable),返回值类型为 void,参数为 Runnable 类型,从字面意思可以理解就是用来执行传进去的任务的。

ExecutorService 接口继承了 Executor 接口,并又声明了一些方法:submit、invokeAll、invokeAny、shutdown等

AbstractExecutorService 抽象类实现了 ExecutorService 接口,基本实现了 ExecutorService 中的抽象方法。

ThreadPoolExecutor 类集成了 AbstractExecutorService 抽象类,是线程池的核心实现类,用来执行被提交的任务。

ScheduledThreadPoolExecutor 类可以在给定的延迟后运行命令,或者定期执行命令。

Future 接口和实现 Future 接口的 FutureTask 类,代表异步计算的结果。

Runnable 接口和 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。

ThreadPoolExecutor 类是线程池的核心类中几个非常重要的方法

 execute(Runnable)  // 在Executor接口中声明的方法,在ThreadPoolExecutor类中进行了具体的实现。该方法是ThreadPoolExecutor类的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit() // 在ExecutorService接口中声明的方法,在AbstractExecutorService抽象类中就有了具体的实现,在ThreadPoolExecutor中并没有对齐覆写,该方法也是用来向线程池提交任务的,但是他和execute()方法不同,它能够通过Future获得任务执行的结果,
shutdown() // 关闭线程池
shutdownNow() // 关闭线程池

线程池的使用

线程池的创建

使用 ThreadPoolExecutor 类的构造方法来创建一个线程

new ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)

向线程池提交任务

execute() 方法用于提交不需要返回值的任务

submit() 方法用于提交需要返回值的任务,该方法会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 对象的 get() 方法获取返回值,get() 方法会阻塞当前线程直到任务完成。

关闭线程池

调用 ThreadPoolExecutor 的 shutdown() 或 shutdownNow() 方法

Executors 工具类

java.util.concurrent.Executors 类是一个工具类,功能就是采用工厂模式创建不同类型的线程池。

Java提供了四种线程池的实现方法(Executors 工具类类的四个静态方法):

 public static ExecutorService newSingleThreadExecutor()  // 创建一个单线程的线程池,它只会用唯一个工作线程来执行任务,所有任务按照指定顺序执行。如果这个唯一的线程因为异常结束,会有一个新的线程来替代它。
public static ExecutorService newFixedThreadPool(int nThreads) // 创建一个定长的线程池,可控制线程的最大并发数,超出的线程会在队列中等待。使用这个线程池的时候,必须根据实际情况估算出线程的数量、
public static ExecutorService newCachedThreadPool() // 创建一个可缓存的线程池,如果线程池长度超过处理任务的需要,可灵活回收空闲线程,如果线程长度满足不了任务需求,会新建线程。此线程池不会对线程池的大小做限制,线程池的大小完全依赖于JVM能够创建的线程数量。使用这种方式需要在代码运行的过程中通过控制并发任务的数量来控制线程的数量。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) // 创建一个定长且支持定时及周期性执行任务的线程池。

newSingleThreadExecutor

 package com.test.multiThread.threadPool;

 import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class ThreadPoolDemo {
public static void main(String[] args){
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++){
final int index = i;
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(index); try{
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}

newFixedThreadPool

 package com.test.multiThread.threadPool;

 import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class ThreadPoolDemo {
public static void main(String[] args){
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++){
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(index);
try{
Thread.sleep(1000);
} catch (InterruptedException e){
e.printStackTrace();
}
}
});
}
}
}

newCachedThreadPool

 package com.test.multiThread.threadPool;

 import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class ThreadPoolDemo {
public static void main(String[] args){
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++){
final int index = i;
try {
Thread.sleep(index * 1000);
}catch (InterruptedException e){
e.printStackTrace();;
}
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(index);
}
});
}
}
}

newScheduledThreadPool

 package com.test.multiThread.threadPool;

 import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; public class ThreadPoolDemo {
public static void main(String[] args){
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
}
}

这四个创建线程池的方法最终都会调用统一的ThreadPoolExecutor类的构造方法

 public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,){
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors,defaultThradFactory(), defaultHandler);
}
具体来说只是这几个值的不同决定了4四个线程池的不同:
corePoolSize:核心池的大小,当线程池当前的个数大于核心线程池的时候,线程池会回收多出来的线程
maximumPoolSize:线程池最大线程数量,当线程池需要执行的任务数量大于核心线程池时,会创建更多的线程,但是做多不能超过这个数
keepAliveTime:标识空余的线程的存活时间。当活跃线程数大于核心线程数,空闲的多余线程最大存活时间
unit:存活时间的单位
workQueue:存放任务的队列,阻塞对列,用于存放所有待执行的Runnable对象
handler:超出线程范围(maximumPoolSize)和队列容量的任务的处理程序

通常使用线程池时,都是直接 线程池实例对象.execute(Runnable) 该方法返回void

一句话可以概括,线程池就是用一对包装住Thread的Worker类的集合,在里面有条件的进行着死循环,从而可以不断接受任务来进行。