Java 中线程池创建的几种方式
首先我们要先知道 Java 中创建线程池的方式,java中创建线程池的方式一般有两种,如下所示:
- 通过Executors工厂方法创建
- 通过new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)自定义创建
Executors 工厂方法创建
上代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
package com.base.demo.design.play;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @Description: 线程池代码
* @BelongsProject: base-demo-design
* @BelongsPackage: com.base.demo.design.play
* @Author: ChenYongJia
* @CreateTime: 2021-08-14 15:26
* @Email: chen87647213@163.com
* @Version: 1.0
*/
public class TestThreadPoolExecutor {
public static void main(String[] args) {
// 创建使用单个线程的线程池
ExecutorService es1 = Executors.newSingleThreadExecutor();
for ( int i = 0 ; i < 10 ; i++) {
es1.submit( new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行任务" );
}
});
}
// 创建使用固定线程数的线程池
ExecutorService es2 = Executors.newFixedThreadPool( 3 );
for ( int i = 0 ; i < 10 ; i++) {
es2.submit( new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行任务" );
}
});
}
// 创建一个会根据需要创建新线程的线程池
ExecutorService es3 = Executors.newCachedThreadPool();
for ( int i = 0 ; i < 20 ; i++) {
es3.submit( new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行任务" );
}
});
}
// 创建拥有固定线程数量的定时线程任务的线程池
ScheduledExecutorService es4 = Executors.newScheduledThreadPool( 2 );
System.out.println( "时间:" + System.currentTimeMillis());
for ( int i = 0 ; i < 5 ; i++) {
es4.schedule( new Runnable() {
@Override
public void run() {
System.out.println( "时间:" +System.currentTimeMillis()+ "--" +Thread.currentThread().getName() + "正在执行任务" );
}
}, 3 , TimeUnit.SECONDS);
}
// 创建只有一个线程的定时线程任务的线程池
ScheduledExecutorService es5 = Executors.newSingleThreadScheduledExecutor();
System.out.println( "时间:" + System.currentTimeMillis());
for ( int i = 0 ; i < 5 ; i++) {
es5.schedule( new Runnable() {
@Override
public void run() {
System.out.println( "时间:" +System.currentTimeMillis()+ "--" +Thread.currentThread().getName() + "正在执行任务" );
}
}, 3 , TimeUnit.SECONDS);
}
}
}
|
运行结果如下:
pool-1-thread-1正在执行任务
pool-1-thread-1正在执行任务
pool-1-thread-1正在执行任务
pool-1-thread-1正在执行任务
pool-1-thread-1正在执行任务
pool-1-thread-1正在执行任务
pool-1-thread-1正在执行任务
pool-1-thread-1正在执行任务
pool-1-thread-1正在执行任务
pool-1-thread-1正在执行任务
pool-2-thread-1正在执行任务
pool-2-thread-2正在执行任务
pool-2-thread-1正在执行任务
pool-2-thread-3正在执行任务
pool-2-thread-2正在执行任务
pool-2-thread-3正在执行任务
pool-2-thread-1正在执行任务
pool-2-thread-3正在执行任务
pool-2-thread-2正在执行任务
pool-2-thread-1正在执行任务
pool-3-thread-1正在执行任务
pool-3-thread-2正在执行任务
pool-3-thread-2正在执行任务
pool-3-thread-3正在执行任务
pool-3-thread-1正在执行任务
pool-3-thread-3正在执行任务
pool-3-thread-4正在执行任务
pool-3-thread-1正在执行任务
pool-3-thread-3正在执行任务
pool-3-thread-4正在执行任务
pool-3-thread-5正在执行任务
pool-3-thread-4正在执行任务
pool-3-thread-6正在执行任务
pool-3-thread-7正在执行任务
pool-3-thread-8正在执行任务
pool-3-thread-9正在执行任务
pool-3-thread-2正在执行任务
pool-3-thread-6正在执行任务
pool-3-thread-1正在执行任务
pool-3-thread-3正在执行任务
时间:1628926041159
时间:1628926041160
时间:1628926044172--pool-5-thread-1正在执行任务
时间:1628926044172--pool-4-thread-2正在执行任务
时间:1628926044172--pool-4-thread-1正在执行任务
时间:1628926044172--pool-4-thread-2正在执行任务
时间:1628926044172--pool-5-thread-1正在执行任务
时间:1628926044172--pool-4-thread-2正在执行任务
时间:1628926044172--pool-4-thread-1正在执行任务
时间:1628926044172--pool-5-thread-1正在执行任务
时间:1628926044172--pool-5-thread-1正在执行任务
时间:1628926044172--pool-5-thread-1正在执行任务
new ThreadPoolExecutor() 自定义创建
1
2
3
4
5
6
7
|
public ThreadPoolExecutor( int corePoolSize, //线程池的核心线程数量
int maximumPoolSize, //线程池的最大线程数
long keepAliveTime, //当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit, //时间单位
BlockingQueue<Runnable> workQueue, //任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory, //线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler) //拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
|
- corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为 0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中;
- maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
- keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于 corePoolSize 时,keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize,即当线程池中的线程数大于 corePoolSize 时,如果一个线程空闲的时间达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean) 方法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为 0;
- unit:参数 keepAliveTime 的时间单位,有7种取值,在 TimeUnit 类中有7种静态属性,如下所示:
1
2
3
4
5
6
7
|
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
|
-
workQueue:阻塞队列。保存等待执行的任务的阻塞队列,当提交一个新的任务到线程池以后, 线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,主要有以下几种处理方式:
- 直接切换:这种方式常用的队列是 SynchronousQueue ,不进行任务存储,直接执行;
- 使用*队列:一般使用基于链表的阻塞队列 LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是 corePoolSize ,而 maximumPoolSize 就不会起作用了(因为是*的)。当线程池中所有的核心线程都是 RUNNING状态 时,这时一个新的任务提交就会放入等待队列中。
- 使用有界队列:一般使用 ArrayBlockingQueue 。使用该方式可以将线程池的最大线程数量限制为 maximumPoolSize ,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可能的降低线程池对资源的消耗,就需要合理的设置这两个数量。
- PS:ArrayBlockingQueue 和 PriorityBlockingQueue 使用较少,一般使用 LinkedBlockingQueue 和 SynchronousQueue。线程池的排队策略与 BlockingQueue 有关。
- threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程做些更有意义的事情,比如设置 daemon 和 优先级 等等
- handler:饱和策略。当线程池的阻塞队列已满和指定的线程都已经开启,说明当前线程池已经处于饱和状态了,那么就需要采用一种策略来处理这种情况。表示当拒绝处理任务时的策略,有以下几种取值(也可以根据应用场景需要来实现 RejectedExecutionHandler 接口自定义策略。如记录日志或持久化不能处理的任务。):
AbortPolicy:默认的拒绝策略,直接抛出异常。`throws RejectedExecutionException`。
CallerRunsPolicy:只用调用者所在线程来运行任务(提交任务的线程自己去执行该任务)。
DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
DiscardPolicy:不处理,直接丢弃任务,没有任何异常抛出。
执行流程:
- 线程池创建线程,会判断当前线程数是否大于 corePoolSize。
- 如果大于则存在缓存队列,缓冲队列存满后会继续创建线程直到 maximumPoolSize ,抛出拒绝的异常。
- 如果小于则创建线程,执行任务,执行完后会从缓存队列中取任务再执行
创建多少线程合适
一般多线程执行的任务类型可以分为 CPU 密集型 和 I/O 密集型,根据不同的任务类型,我们计算线程数的方法也不一样。创建多少线程合适,要看多线程具体的应用场景。我们的程序一般都是 CPU 计算 和 I/O 操作交叉执行 的,由于 I/O 设备 的速度相对于 CPU 来说都很慢,所以大部分情况下,I/O 操作执行的时间相对于 CPU 计算来说都非常长 ,这种场景我们一般都称为 I/O 密集型计算 和 I/O 密集型计算 相对的就是 CPU 密集型计算,CPU 密集型计算 大部分场景下都是 纯 CPU 计算。
- CPU 密集型任务:多线程主要目的是提成CPU利用率,保持和CPU核数一致即可。可以将线程数设置为 N(CPU 核心数)+1 ,比 CPU 核心数 多出来的一个线程是 为了防止线程偶发的缺页中断,或者其它原因导致的 任务暂停 而带来的影响。一旦 任务暂停 ,CPU 就会处于 空闲状态 ,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- 测试代码如下,结果还是自己亲力亲为吧实践出真知
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
package com.base.demo.design.play;
import java.util.List;
/**
* @Description: CPU测试
* @BelongsProject: base-demo-design
* @BelongsPackage: com.base.demo.design.play
* @Author: ChenYongJia
* @CreateTime: 2021-08-14 16:13
* @Email: chen87647213@163.com
* @Version: 1.0
*/
public class CPUTypeTest implements Runnable {
/**
* 整体执行时间,包括在队列中等待的时间
*/
List<Long> wholeTimeList;
/**
* 真正执行时间
*/
List<Long> runTimeList;
private long initStartTime = 0 ;
/**
* 构造函数
*
* @param runTimeList
* @param wholeTimeList
* @return
* @date 2021/8/14 16:13
* @author ChenYongJia
* @version 1.0
*/
public CPUTypeTest(List<Long> runTimeList, List<Long> wholeTimeList) {
initStartTime = System.currentTimeMillis();
this .runTimeList = runTimeList;
this .wholeTimeList = wholeTimeList;
}
/**
* 判断素数
*
* @param number
* @return boolean
* @date 2021/8/14 16:13
* @author ChenYongJia
* @version 1.0
*/
public boolean isPrime( final int number) {
if (number <= 1 )
return false ;
for ( int i = 2 ; i <= Math.sqrt(number); i++) {
if (number % i == 0 )
return false ;
}
return true ;
}
/**
* 计算素数
*
* @param lower
* @param upper
* @return int
* @date 2021/8/14 16:14
* @author ChenYongJia
* @version 1.0
*/
public int countPrimes( final int lower, final int upper) {
int total = 0 ;
for ( int i = lower; i <= upper; i++) {
if (isPrime(i))
total++;
}
return total;
}
@Override
public void run() {
long start = System.currentTimeMillis();
countPrimes( 1 , 1000000 );
long end = System.currentTimeMillis();
long wholeTime = end - initStartTime;
long runTime = end - start;
wholeTimeList.add(wholeTime);
runTimeList.add(runTime);
System.out.println( "单个线程花费时间:" + (end - start));
}
}
|
I/O 密集型任务:这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N(CPU 核心数)。
- 一般最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目
- 实战:实际需要根据上线情况进行调整优化
- 测试代码如下,结果还是自己亲力亲为吧实践出真知
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
package com.base.demo.design.play;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Vector;
/**
* @Description: IO测试
* @BelongsProject: base-demo-design
* @BelongsPackage: com.base.demo.design.play
* @Author: ChenYongJia
* @CreateTime: 2021-08-14 16:18
* @Email: chen87647213@163.com
* @Version: 1.0
*/
public class IOTypeTest implements Runnable {
/**
* 整体执行时间,包括在队列中等待的时间
*/
Vector<Long> wholeTimeList;
/**
* 真正执行时间
*/
Vector<Long> runTimeList;
private long initStartTime = 0 ;
/**
* 构造函数
*
* @param runTimeList
* @param wholeTimeList
*/
public IOTypeTest(Vector<Long> runTimeList, Vector<Long> wholeTimeList) {
initStartTime = System.currentTimeMillis();
this .runTimeList = runTimeList;
this .wholeTimeList = wholeTimeList;
}
/**
* IO操作
*
* @return void
* @date 2021/8/14 16:18
* @author ChenYongJia
* @version 1.0
*/
public void readAndWrite() throws IOException {
File sourceFile = new File( "D:/test.txt" );
//创建输入流
BufferedReader input = new BufferedReader( new FileReader(sourceFile));
//读取源文件,写入到新的文件
String line = null ;
while ((line = input.readLine()) != null ) {
//System.out.println(line);
}
//关闭输入输出流
input.close();
}
@Override
public void run() {
long start = System.currentTimeMillis();
try {
readAndWrite();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
long end = System.currentTimeMillis();
long wholeTime = end - initStartTime;
long runTime = end - start;
wholeTimeList.add(wholeTime);
runTimeList.add(runTime);
System.out.println( "单个线程花费时间:" + (end - start));
}
}
|
喜欢折腾的同学可以试试(我贴出来的两段 TEST代码):在不同线程数的情况下,run 方法运行时间的差异。可以通过创建不同数量的线程,线程中 new 该 Test 对象(new 两个 List 传到构造参数里)并提交到线程池。查看并归纳计算得出结果。
附:线程池原理
学习线程池的实现原理,有助于你更好地理解内容。
在 HotSpot VM 的线程模型中,Java 线程被一对一映射为内核线程。Java 在使用线程执行程序时,需要创建一个内核线程;当该 Java 线程被终止时,这个内核线程也会被回收。因此 Java 线程的创建与销毁将会消耗一定的计算机资源,从而增加系统的性能开销。
除此之外,大量创建线程同样会给系统带来性能问题,因为内存和 CPU 资源都将被线程抢占,如果处理不当,就会发生内存溢出、CPU 使用率超负荷等问题。
为了解决上述两类问题,Java 提供了线程池概念,对于频繁创建线程的业务场景,线程池可以创建固定的线程数量,并且在操作系统底层,轻量级进程将会把这些线程映射到内核。
线程池可以提高线程复用,又可以固定最大线程使用量,防止无限制地创建线程。当程序提交一个任务需要一个线程时,会去线程池中查找是否有空闲的线程,若有,则直接使用线程池中的线程工作,若没有,会去判断当前已创建的线程数量是否超过最大线程数量,如未超过,则创建新线程,如已超过,则进行排队等待或者直接抛出异常。
最后
在不同的业务场景以及不同配置的部署机器中,线程池的线程数量设置是不一样的。其设置不宜过大,也不宜过小,要根据具体情况,计算出一个大概的数值,再通过实际的性能测试,计算出一个合理的线程数量。
到此这篇关于Java线程池大小设置的文章就介绍到这了,更多相关Java线程池大小设置内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://juejin.cn/post/6997566049155547166