executors
executors 是一个java中的工具类. 提供工厂方法来创建不同类型的线程池.
从上图中也可以看出, executors的创建线程池的方法, 创建出来的线程池都实现了 executorservice
接口. 常用方法有以下几个:
-
newfixedthreadpool(int threads)
: 创建固定数目线程的线程池, 超出的线程会在队列中等待. -
newcachedthreadpool()
: 创建一个可缓存线程池, 如果线程池长度超过处理需要, 可灵活回收空闲线程(60秒), 若无可回收,则新建线程. -
newsinglethreadexecutor()
: 创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序(fifo, lifo, 优先级)执行. 如果某一个任务执行出错, 将有另一个线程来继续执行. -
newscheduledthreadpool(int corepoolsize)
: 创建一个支持定时及周期性的任务执行的线程池, 多数情况下可用来替代timer类.
executors 例子
newcachedthreadpool
线程最大数为 integer.max_value
, 当我们往线程池添加了 n 个任务, 这 n 个任务都是一起执行的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
executorservice cachedthreadpool = executors.newcachedthreadpool();
cachedthreadpool.execute( new runnable() {
@override
public void run() {
for (;;) {
try {
thread.currentthread().sleep( 1000 );
system.out.println(thread.currentthread().getname());
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}
});
cachedthreadpool.execute( new runnable() {
@override
public void run() {
for (;;) {
try {
thread.currentthread().sleep( 1000 );
system.out.println(thread.currentthread().getname());
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}
});
|
newfixedthreadpool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
executorservice cachedthreadpool = executors.newfixedthreadpool( 1 );
cachedthreadpool.execute( new runnable() {
@override
public void run() {
for (;;) {
try {
thread.currentthread().sleep( 1000 );
system.out.println(thread.currentthread().getname());
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}
});
cachedthreadpool.execute( new runnable() {
@override
public void run() {
for (;;) {
try {
thread.currentthread().sleep( 1000 );
system.out.println(thread.currentthread().getname());
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}
});
|
newscheduledthreadpool
三秒执行一次, 只有执行完这一次后, 才会执行.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
scheduledexecutorservice scheduledexecutorservice = executors.newscheduledthreadpool( 5 );
scheduledexecutorservice.schedule( new runnable() {
@override
public void run() {
for (;;) {
try {
thread.currentthread().sleep( 2000 );
system.out.println(thread.currentthread().getname());
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}
}, 3 , timeunit.seconds);
|
newsinglethreadexecutor
顺序执行各个任务, 第一个任务执行完, 才会执行下一个.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
executorservice executorservice = executors.newsinglethreadexecutor();
executorservice.execute( new runnable() {
@override
public void run() {
for (;;) {
try {
system.out.println(thread.currentthread().getname());
thread.currentthread().sleep( 10000 );
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}
});
executorservice.execute( new runnable() {
@override
public void run() {
for (;;) {
try {
system.out.println(thread.currentthread().getname());
thread.currentthread().sleep( 2 );
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}
});
|
executors存在什么问题
在阿里巴巴java开发手册中提到,使用executors创建线程池可能会导致oom(outofmemory ,内存溢出),但是并没有说明为什么,那么接下来我们就来看一下到底为什么不允许使用executors?
我们先来一个简单的例子,模拟一下使用executors导致oom的情况.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
/**
* @author hollis
*/
public class executorsdemo {
private static executorservice executor = executors.newfixedthreadpool( 15 );
public static void main(string[] args) {
for ( int i = 0 ; i < integer.max_value; i++) {
executor.execute( new subthread());
}
}
}
class subthread implements runnable {
@override
public void run() {
try {
thread.sleep( 10000 );
} catch (interruptedexception e) {
//do nothing
}
}
}
|
通过指定jvm参数:-xmx8m -xms8m
运行以上代码,会抛出oom:
exception in thread "main" java.lang.outofmemoryerror: gc overhead limit exceeded
at java.util.concurrent.linkedblockingqueue.offer(linkedblockingqueue.java:416)
at java.util.concurrent.threadpoolexecutor.execute(threadpoolexecutor.java:1371)
at com.hollis.executorsdemo.main(executorsdemo.java:16)
以上代码指出,executorsdemo.java 的第16行,就是代码中的 executor.execute(new subthread());
java中的 blockingqueue
主要有两种实现, 分别是 arrayblockingqueue
和 linkedblockingqueue
.
arrayblockingqueue
是一个用数组实现的有界阻塞队列, 必须设置容量.
1
2
3
4
5
6
7
8
|
public arrayblockingqueue( int capacity, boolean fair) {
if (capacity <= 0 )
throw new illegalargumentexception();
this .items = new object[capacity];
lock = new reentrantlock(fair);
notempty = lock.newcondition();
notfull = lock.newcondition();
}
|
linkedblockingqueue
是一个用链表实现的有界阻塞队列, 容量可以选择进行设置, 不设置的话, 将是一个无边界的阻塞队列, 最大长度为 integer.max_value
.
1
2
3
|
public linkedblockingqueue() {
this (integer.max_value);
}
|
这里的问题就出在如果我们不设置 linkedblockingqueue
的容量的话, 其默认容量将会是 integer.max_value
.
而 newfixedthreadpool
中创建 linkedblockingqueue
时, 并未指定容量. 此时, linkedblockingqueue
就是一个无边界队列, 对于一个无边界队列来说, 是可以不断的向队列中加入任务的, 这种情况下就有可能因为任务过多而导致内存溢出问题.
newcachedthreadpool
和 newscheduledthreadpool
这两种方式创建的最大线程数可能是integer.max_value
, 而创建这么多线程, 必然就有可能导致oom.
threadpoolexecutor 创建线程池
避免使用 executors
创建线程池, 主要是避免使用其中的默认实现, 那么我们可以自己直接调用 threadpoolexecutor
的构造函数来自己创建线程池. 在创建的同时, 给 blockqueue
指定容量就可以了.
1
2
3
|
executorservice executor = new threadpoolexecutor( 10 , 10 ,
60l, timeunit.seconds,
new arrayblockingqueue( 10 ));
|
这种情况下, 一旦提交的线程数超过当前可用线程数时, 就会抛出 java.util.concurrent.rejectedexecutionexception
, 这是因为当前线程池使用的队列是有边界队列, 队列已经满了便无法继续处理新的请求.
除了自己定义 threadpoolexecutor
外. 还有其他方法. 如apache和guava等.
四个构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public threadpoolexecutor( int corepoolsize,
int maximumpoolsize,
long keepalivetime,
timeunit unit,
blockingqueue<runnable> workqueue)
public threadpoolexecutor( int corepoolsize,
int maximumpoolsize,
long keepalivetime,
timeunit unit,
blockingqueue<runnable> workqueue,
threadfactory threadfactory)
public threadpoolexecutor( int corepoolsize,
int maximumpoolsize,
long keepalivetime,
timeunit unit,
blockingqueue<runnable> workqueue,
rejectedexecutionhandler handler)
public threadpoolexecutor( int corepoolsize,
int maximumpoolsize,
long keepalivetime,
timeunit unit,
blockingqueue<runnable> workqueue,
threadfactory threadfactory,
rejectedexecutionhandler handler)
|
int corepoolsize => 该线程池中核心线程数最大值
线程池新建线程的时候,如果当前线程总数小于corepoolsize, 则新建的是核心线程, 如果超过corepoolsize, 则新建的是非核心线程
核心线程默认情况下会一直存活在线程池中, 即使这个核心线程啥也不干(闲置状态).
如果指定 threadpoolexecutor 的 allowcorethreadtimeout
这个属性为 true
, 那么核心线程如果不干活(闲置状态)的话, 超过一定时间(时长下面参数决定), 就会被销毁掉
很好理解吧, 正常情况下你不干活我也养你, 因为我总有用到你的时候, 但有时候特殊情况(比如我自己都养不起了), 那你不干活我就要把你干掉了
int maximumpoolsize
该线程池中线程总数最大值
线程总数 = 核心线程数 + 非核心线程数.
long keepalivetime
该线程池中非核心线程闲置超时时长
一个非核心线程, 如果不干活(闲置状态)的时长超过这个参数所设定的时长, 就会被销毁掉
如果设置 allowcorethreadtimeout = true
, 则会作用于核心线程
timeunit unit
keepalivetime的单位, timeunit是一个枚举类型, 其包括:
1
2
3
4
5
6
7
|
timeunit.days; //天
timeunit.hours; //小时
timeunit.minutes; //分钟
timeunit.seconds; //秒
timeunit.milliseconds; //毫秒
timeunit.microseconds; //微妙
timeunit.nanoseconds; //纳秒
|
blockingqueue workqueue
一个阻塞队列, 用来存储等待执行的任务. 也就是说现在有10个任务, 核心线程 有四个, 非核心线程有六个, 那么这六个线程会被添加到 workqueue
中, 等待执行.
这个参数的选择也很重要, 会对线程池的运行过程产生重大影响, 一般来说, 这里的阻塞队列有以下几种选择:
synchronousqueue
: 这个队列接收到任务的时候, 会直接提交给线程处理, 而不保留它, 如果所有线程都在工作怎么办? 那就*新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumpoolsize而不能新建线程>的错误, 使用这个类型队列的时候, maximumpoolsize
一般指定成 integer.max_value
, 即无限大.
linkedblockingqueue
: 这个队列接收到任务的时候, 如果当前线程数小于核心线程数, 则核心线程处理任务; 如果当前线程数等于核心线程数, 则进入队列等待. 由于这个队列最大值为 integer.max_value
, 即所有超过核心线程数的任务都将被添加到队列中,这也就导致了 maximumpoolsize
的设定失效, 因为总线程数永远不会超过 corepoolsize.
arrayblockingqueue
: 可以限定队列的长度, 接收到任务的时候, 如果没有达到 corepoolsize 的值, 则核心线程执行任务, 如果达到了, 则入队等候, 如果队列已满, 则新建线程(非核心线程)执行任务, 又如果总线程数到了maximumpoolsize, 并且队列也满了, 则发生错误.
delayqueue
: 队列内元素必须实现 delayed 接口, 这就意味着你传进去的任务必须先实现delayed接口. 这个队列接收到任务时, 首先先入队, 只有达到了指定的延时时间, 才会执行任务.
threadfactory threadfactory
它是threadfactory类型的变量, 用来创建新线程.
默认使用 executors.defaultthreadfactory()
来创建线程. 使用默认的 threadfactory
来创建线程时, 会使新创建的线程具有相同的 norm_priority
优先级并且是非守护线程, 同时也设置了线程的名称.
rejectedexecutionhandler handler
表示当拒绝处理任务时的策略, 有以下四种取值:
threadpoolexecutor.abortpolicy:丢弃任务并抛出rejectedexecutionexception异常(默认).
threadpoolexecutor.discardpolicy:直接丢弃任务, 但是不抛出异常.
threadpoolexecutor.discardoldestpolicy:丢弃队列最前面的任务, 然后重新尝试执行任务(重复此过程)
threadpoolexecutor.callerrunspolicy:用调用者所在的线程来执行任务.
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。