SpringBoot系统列 3 - 多线程数据处理(ThreadPoolTaskExecutor、DruidDataSource)

时间:2021-11-05 11:32:07

在上篇文章的基础上进行改造:

package com.hello.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; /**
* 线程工具类
* @author XIHONGLEI
* @date 2018-11-13
*/
@Component
public class ThreadPoolUtil {
private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtil.class); @Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor; public void executeTask(Runnable task){
threadPoolTaskExecutor.submit(task);
//try {
//注意task.get()会阻塞,直到返回数据为止,所以一般这样用法很少用
//resp = task.get();
//} catch (InterruptedException e) {
//e.printStackTrace();
//} catch (ExecutionException e) {
//e.printStackTrace();
//}
}
}
package com.hello;

import com.hello.filter.ApiInterceptor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; /**
* 配置类
* @author XIHONGLEI
* @date 2018-10-31
*/
@SpringBootConfiguration
public class WebConfig extends WebMvcConfigurationSupport { @Value("${server.port}")
public String port; @Value("${threadpool.core-pool-size}")
private int corePoolSize; @Value("${threadpool.max-pool-size}")
private int maxPoolSize; @Value("${threadpool.queue-capacity}")
private int queueCapacity; @Value("${threadpool.keep-alive-seconds}")
private int keepAliveSeconds; @Override
protected void addInterceptors(InterceptorRegistry registry) {
super.addInterceptors(registry);
// 将 ApiInterceptor 拦截器类添加进去
registry.addInterceptor(new ApiInterceptor());
} @Bean(name="threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setKeepAliveSeconds(keepAliveSeconds);
// 核心线程池数
pool.setCorePoolSize(corePoolSize);
// 最大线程
pool.setMaxPoolSize(maxPoolSize);
// 队列容量
pool.setQueueCapacity(queueCapacity);
// 队列满,线程被拒绝执行策略
pool.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
return pool;
}
}
package com.hello.service.impl;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson.JSONArray;
import com.hello.entity.ContractDetailDto;
import com.hello.service.CheckPositionService;
import com.hello.util.ThreadPoolUtil;
import org.apache.ibatis.mapping.Environment;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import org.apache.ibatis.transaction.TransactionFactory;
import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List; @Service("checkPositionService")
public class CheckPositonServiceImpl implements CheckPositionService { @Autowired
private ThreadPoolUtil threadPoolUtil; private static DruidDataSource dataSourceMDB = null;
private static SqlSessionFactory sqlSessionFactory = null;
//声明Connection对象
Connection con;
//驱动程序名
String driver = "com.mysql.jdbc.Driver";
//URL指向要访问的数据库名mydata
String url = "jdbc:mysql://localhost:3306/db_hello?useUnicode=true&characterEncoding=utf8&connectTimeout=5000&socketTimeout=60000&autoReconnect=true&failOverReadOnly=false&allowMultiQueries=true";
//MySQL配置时的用户名
String user = "root";
//MySQL配置时的密码
String password = "root"; private SqlSession getSqlSession() throws Exception {
try{
if(dataSourceMDB == null || sqlSessionFactory == null){
dataSourceMDB = new DruidDataSource();
//设置连接参数
dataSourceMDB.setUrl(url);
dataSourceMDB.setDriverClassName(driver);
dataSourceMDB.setUsername(user);
dataSourceMDB.setPassword(password);
//配置初始化大小、最小、最大
dataSourceMDB.setInitialSize(10);
dataSourceMDB.setMinIdle(10);
dataSourceMDB.setMaxActive(5000);
//连接泄漏监测
//dataSourceMDB.setRemoveAbandoned(true);
//dataSourceMDB.setRemoveAbandonedTimeout(30);
//配置获取连接等待超时的时间
dataSourceMDB.setMaxWait(500000);
//配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
dataSourceMDB.setTimeBetweenEvictionRunsMillis(20000);
//防止过期
dataSourceMDB.setValidationQuery("SELECT 'x'");
dataSourceMDB.setTestWhileIdle(true);
dataSourceMDB.setTestOnBorrow(true); TransactionFactory transactionFactory = new JdbcTransactionFactory();
Environment environment = new Environment("development", transactionFactory, dataSourceMDB);
Configuration configuration = new Configuration(environment);
sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
return sqlSessionFactory.openSession();
}else{
return sqlSessionFactory.openSession();
}
}catch(Exception e){
throw e;
}
}
private List<ContractDetailDto> getContractDetailList(Long contId){
try{
List<ContractDetailDto> list = new ArrayList<>();
SqlSession session = getSqlSession();
con = session.getConnection();
if(!con.isClosed()){
System.out.println("Succeeded connecting to the Database!");
} String sql = "SELECT cont_detail_id,detail_start_time,detail_end_time FROM ad_contract_detail WHERE is_del=0 AND cont_id=?";
PreparedStatement ps = con.prepareStatement(sql);
ps.setLong(1,contId);
ResultSet rs = ps.executeQuery();
while(rs.next()){
ContractDetailDto dto = new ContractDetailDto();
dto.setContDetailId(rs.getLong("cont_detail_id"));
dto.setDetailStartTime(rs.getDate("detail_start_time"));
dto.setDetailEndTime(rs.getDate("detail_end_time"));
list.add(dto);
}
rs.close();
con.close();
session.close();
return list;
} catch(Exception e) {
//数据库驱动类异常处理
System.out.println("Sorry,can`t find the Driver!");
e.printStackTrace();
}
return null;
} private List<ContractDetailDto> checkIsLock(Long contId,List<ContractDetailDto> dtoList,String threadName){
try{
SqlSession session = getSqlSession();
con = session.getConnection();
if(!con.isClosed()){
System.out.println("Succeeded connecting to the Database!");
}
List<ContractDetailDto> checkOutList = new ArrayList<>();
String sql = "SELECT ac.cont_name,ac.cont_id,ac.cont_code,acd.cont_detail_id,acdp.position_id,rp.position_name,rp.position_code " +
"FROM rs_position rp " +
"INNER JOIN ad_contract_detail_point acdp ON rp.id = acdp.position_id " +
"INNER JOIN ad_contract_detail acd ON acdp.cont_detail_id = acd.cont_detail_id " +
"INNER JOIN rs_put_plan rpp ON acd.cont_detail_id = rpp.contract_detail_id " +
"INNER JOIN ad_contract ac ON ac.cont_id = rpp.contract_id " +
"WHERE rpp.point_stauts != 1 AND acd.is_del = 0 AND ac.is_del=0 " +
"AND rpp.contract_id NOT IN (?) " +
"AND ( " +
" (acd.detail_start_time >= ? AND acd.detail_start_time <= ?) " +
" OR (acd.detail_start_time <= ? AND acd.detail_end_time >= ?) " +
" OR (acd.detail_end_time >= ? AND acd.detail_end_time <= ?) " +
") " +
"AND rp.id IN ( " +
" SELECT position_id FROM ad_contract_detail_point WHERE cont_detail_id = ? " +
")";
int i = 1;
for(ContractDetailDto dto : dtoList){
System.out.println("[".concat(threadName).concat("]正在执行第:").concat(i+"").concat("条"));
PreparedStatement ps = con.prepareStatement(sql);
ps.setLong(1,contId);
java.sql.Date sTime = new java.sql.Date(dto.getDetailStartTime().getTime());
java.sql.Date eTime = new java.sql.Date(dto.getDetailEndTime().getTime());
ps.setDate(2,sTime);
ps.setDate(3,eTime);
ps.setDate(4,sTime);
ps.setDate(5,eTime);
ps.setDate(6,sTime);
ps.setDate(7,eTime);
ps.setLong(8,dto.getContDetailId());
ResultSet rs = ps.executeQuery();
while(rs.next()){
ContractDetailDto cdDto = new ContractDetailDto();
cdDto.setContDetailId(rs.getLong("cont_detail_id"));
cdDto.setPositionId(rs.getLong("position_id"));
cdDto.setPositionCode(rs.getString("position_code"));
cdDto.setPositionName(rs.getString("position_name"));
cdDto.setContId(rs.getLong("cont_id"));
cdDto.setContName(rs.getString("cont_name"));
cdDto.setContCode(rs.getString("cont_code"));
checkOutList.add(cdDto);
}
i+=1;
rs.getStatement().close();
rs.close();
ps.close();
}
con.close();
session.close();
return checkOutList;
} catch(Exception e) {
//数据库驱动类异常处理
System.out.println("Sorry,can`t find the Driver!");
e.printStackTrace();
}
return null;
} @Override
public void checkPosition(Long contId) {
List<ContractDetailDto> detailDtoList = this.getContractDetailList(contId);
if(detailDtoList != null && detailDtoList.size() > 0){
List<ContractDetailDto> threadList1 = new ArrayList<>();
List<ContractDetailDto> threadList2 = new ArrayList<>();
List<ContractDetailDto> threadList3 = new ArrayList<>();
if(detailDtoList.size() <= 3){
threadList1.addAll(detailDtoList);
}else{
int stepLen = detailDtoList.size() / 3;
int i = 1;
for(ContractDetailDto dto : detailDtoList){
if(i <= stepLen){
threadList1.add(dto);
}else if(i> stepLen && i <= stepLen * 2){
threadList2.add(dto);
}else{
threadList3.add(dto);
}
i += 1;
}
}
if(threadList1.size() > 0){
Runnable runnable = () ->{
List<ContractDetailDto> checkOutList = this.checkIsLock(contId,threadList1,"线程1");
try {
System.out.println("[线程1]最后结果是:");
JSONArray array = new JSONArray();
for(ContractDetailDto dto : checkOutList){
if(dto.getPositionId() != null && dto.getPositionId() != 0) {
array.add(dto.toJsonString());
}
}
System.out.println(array);
}catch (Exception e){
e.printStackTrace();
}
};
threadPoolUtil.executeTask(runnable);
}
if(threadList2.size() > 0){
Runnable runnable = () ->{
List<ContractDetailDto> checkOutList = this.checkIsLock(contId,threadList2,"线程2");
try {
System.out.println("[线程2]最后结果是:");
JSONArray array = new JSONArray();
for(ContractDetailDto dto : checkOutList){
if(dto.getPositionId() != null && dto.getPositionId() != 0) {
array.add(dto.toJsonString());
}
}
System.out.println(array);
}catch (Exception e){
e.printStackTrace();
}
};
threadPoolUtil.executeTask(runnable);
}
if(threadList3.size() > 0){
Runnable runnable = () ->{
List<ContractDetailDto> checkOutList = this.checkIsLock(contId,threadList2,"线程3");
try {
System.out.println("[线程3]最后结果是:");
JSONArray array = new JSONArray();
for(ContractDetailDto dto : checkOutList){
if(dto.getPositionId() != null && dto.getPositionId() != 0) {
array.add(dto.toJsonString());
}
}
System.out.println(array);
}catch (Exception e){
e.printStackTrace();
}
};
threadPoolUtil.executeTask(runnable);
}
}
}
}