Spring ThreadPoolTaskExecutor

时间:2022-11-03 00:07:53

1. ThreadPoolTaskExecutor配置

 1 <!-- spring thread pool executor -->
2 <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
3 <!-- 线程池维护线程的最少数量 -->
4 <property name="corePoolSize" value="5" />
5 <!-- 允许的空闲时间 -->
6 <property name="keepAliveSeconds" value="200" />
7 <!-- 线程池维护线程的最大数量 -->
8 <property name="maxPoolSize" value="10" />
9 <!-- 缓存队列 -->
10 <property name="queueCapacity" value="20" />
11 <!-- 对拒绝task的处理策略 -->
12 <property name="rejectedExecutionHandler">
13 <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
14 </property>
15 </bean>

属性字段说明

corePoolSize:线程池维护线程的最少数量

keepAliveSeconds:允许的空闲时间

maxPoolSize:线程池维护线程的最大数量

queueCapacity:缓存队列

rejectedExecutionHandler:对拒绝task的处理策略

2. execute(Runable)方法执行过程

如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maxPoolSize,建新的线程来处理被添加的任务。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maxPoolSize,那么通过handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程 maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

3. 示例代码

Junit Test

 1 @RunWith(SpringJUnit4ClassRunner.class)
2 @ContextConfiguration(classes = { MultiThreadConfig.class })
3 public class MultiThreadTest {
4
5 @Autowired
6 private ThreadPoolTaskExecutor taskExecutor;
7
8 @Autowired
9 private MultiThreadProcessService multiThreadProcessService;
10
11 @Test
12 public void test() {
13
14 int n = 20;
15 for (int i = 0; i < n; i++) {
16 taskExecutor.execute(new MultiThreadDemo(multiThreadProcessService));
17 System.out.println("int i is " + i + ", now threadpool active threads totalnum is " + taskExecutor.getActiveCount());
18 }
19
20 try {
21 System.in.read();
22 } catch (IOException e) {
23 throw new RuntimeException(e);
24 }
25 }
26 }

MultiThreadDemo

 1 /**
2 * 多线程并发处理demo
3 * @author daniel.zhao
4 *
5 */
6 public class MultiThreadDemo implements Runnable {
7
8 private MultiThreadProcessService multiThreadProcessService;
9
10 public MultiThreadDemo() {
11 }
12
13 public MultiThreadDemo(MultiThreadProcessService multiThreadProcessService) {
14 this.multiThreadProcessService = multiThreadProcessService;
15 }
16
17 @Override
18 public void run() {
19 multiThreadProcessService.processSomething();
20 }
21
22 }

MultiThreadProcessService

 1 @Service
2 public class MultiThreadProcessService {
3
4 public static final Logger logger = Logger.getLogger(MultiThreadProcessService.class);
5
6 /**
7 * 默认处理流程耗时1000ms
8 */
9 public void processSomething() {
10 logger.debug("MultiThreadProcessService-processSomething" + Thread.currentThread() + "......start");
11 try {
12 Thread.sleep(1000);
13 } catch (InterruptedException e) {
14 throw new RuntimeException(e);
15 }
16 logger.debug("MultiThreadProcessService-processSomething" + Thread.currentThread() + "......end");
17 }
18 }