springBoot服务整合线程池ThreadPoolTaskExecutor与@Async详解使用

时间:2023-03-09 14:51:48
springBoot服务整合线程池ThreadPoolTaskExecutor与@Async详解使用

ThreadPoolExecutor:=======这个是java自己实现的线程池执行类,基本上创建线程池都是通过这个类进行的创建。
ThreadPoolTaskExecutor:========这个是springboot基于ThreadPoolExecutor实现的一个线程池执行类。

注意:

在springboot当中,如果没有配置线程池的话,springboot会自动配置一个ThreadPoolTaskExecutor线程池到bean当中,我们调用只需要

@Autowired

ThreadPoolTaskExecutor  threadPoolTaskExecutor;

第一步: 首先在application启动类添加@EnableAsync

@SpringBootApplication
@EnableAsync //首先在application启动类添加@EnableAsync
public class ThreadpoolApplication {
public static void main(String[] args) {
SpringApplication.run(ThreadpoolApplication.class, args);
}
}

第二步:配置线程池,不配置的话使用springboot默认的线程池。

package com.aswatson.csc.task.conf;

import java.util.concurrent.Executor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration
@EnableAsync
public class AsyncThreadConfiguration { @Bean("kafkaThreadExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);//核心线程数
executor.setMaxPoolSize(20);//最大线程数
executor.setKeepAliveSeconds(60);//空闲线程存活时间
executor.setThreadNamePrefix("kafkaThreadAsync-");
executor.initialize();
return executor;
} }

第三步:测试1:在需要异步执行的方法上加上@Async注解。

@Service
public class AsyncTest {
protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Async
public void hello(String name){
customerEventLogMapper.insert(customerEventLog);
logger.info("异步线程启动 started."+name);
}
}

第四步:测试2:使用注入的模式:

package com.example.apidemo.completableFutrue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import java.time.LocalDateTime; @Service
public class AsyncService { @Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor; public void addEventLog(String buId, String status){
CustomerEventLogPO customerEventLog = new CustomerEventLogPO();
customerEventLog.setUuid(uuid);
customerEventLog.setStatus(status);
customerEventLog.setCreated(LocalDateTime.now());
customerEventLogMapper.insert(customerEventLog); threadPoolTaskExecutor.submit(new Thread(()->{
customerEventLogMapper.insert(customerEventLog);
})); //submit有返回值 threadPoolTaskExecutor.execute(new Thread(()->{
customerEventLogMapper.insert(customerEventLog);
})); //execute无返回值
} }

注意: 如果配置多个线程池,该如何指定线程池呢?

方式1: @Resources("kafkaThreadExecutor")。

方式2: 如果有多个线程池,但是在@Async注解里面没有指定的话,会默认加载第一个配置的线程池。

======================================================================================================================================================================

// @Bean()的拒绝策略:
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

拒绝策略:如果(总任务数 - 核心线程数 - 任务队列数)-(最大线程数 - 核心线程数)> 0 的话,则会出现线程拒绝。举例:( 12 - 5 - 2 ) - ( 8 - 5 ) > 0,会出现线程拒绝。线程拒绝又分为 4 种策略,分别为:

    • CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
    • AbortPolicy():直接抛出异常。
    • DiscardPolicy():直接丢弃。
    • DiscardOldestPolicy():丢弃队列中最老的任务。