转载-SpringBoot结合线程池解决多线程问题实录;以及自己的总结

时间:2024-08-19 19:06:32

原文地址:https://blog.****.net/GFJ0814/article/details/92422245

看看这篇文章(继续学习):https://www.jianshu.com/p/362b365e1bcc

背景:线上有一个接口,3台机器总共QPS在3000左右,单机QPS在1000左右,接口响应时间2ms。为了保证接口的任何改动在上线之前能够在大流量下能够没有问题。提出想法,搭建一套流量回放环境,上线之前把代码先部署到流量回放环境。然后将线上的流量导入到流量回放环境,用真实的业务请求来做模拟测试,这个过程我们称作是流量回放。

​ 为了保证流量回放的时候,流量导入过程,不能影响正常的线上接口请求,也就是响应时间不能有变化。首先就要考虑启动一个线程来异步处理这个事情。好,按照这个想法,写了第一版本代码。(以下代码是有问题的,只怪我too young too sample)

@RequestMapping(value = "/flags", method = RequestMethod.POST)
@ResponseBody
public ServerResponse getFlagsPost(@RequestBody NewServerParam param) {
//如果流量回放开关打开,就创建线程,将请求发送到流量回放环境
if(Constant.TRAFFIC_REPLAY_FLAG){
Runnable r = ()->{
HttpClientUtil.post(trafficReplayUrl+"sample/v2/flags", JSON.toJSONString(param),10000,"application/json");
};
Thread thread = new Thread(r);
thread.start();
}
long a = System.currentTimeMillis();
ServerResponse response = new ServerResponse();
response.setFlags(serverService.getFlags(param, param.getExp_key()));
long b = System.currentTimeMillis();
LOGGER.info(logService.parseToJSon(param, response, (b - a), LOGGER, Level.INFO.getName()));
return response;
}

线下测试没有问题,一上线,大流量上来,服务器瞬间报错上万条,马上回滚。

报错提示:

​ java.lang.OutOfMemoryError: Unable to create new native thread

这个错误的意思是:程序创建的线程数量已达到上限值

剖析错误
​ JVM向操作系统申请创建新的 native thread(原生线程)时, 就有可能会碰到 java.lang.OutOfMemoryError: Unable to create new native thread 错误. 如果底层操作系统创建新的 native thread 失败, JVM就会抛出相应的OutOfMemoryError. 原生线程的数量受到具体环境的限制, 通过一些测试用例可以找出这些限制, 请参考下文的示例. 但总体来说, 导致 java.lang.OutOfMemoryError: Unable to create new native thread 错误的场景大多经历以下这些阶段:

java程序向jvm申请创建一个线程

jvm本地代码(native code)代理该请求,尝试创建一个操作系统级别的native Thread(原生线程)

操作系统尝试创建一个新的native Thread,需要同时分配一些内存给该线程

如果操作系统的虚拟内存已经耗尽,或者受到32位进程的地址空间限制(约2-4GB),OS就会拒绝本地内存分配

JVM抛出 java.lang.OutOfMemoryError: Unable to create new native thread 错误。

参考:https://blog.****.net/renfufei/article/details/78088553

改进方案-springBoot整合线程池优化

​ 错误很明显,就是创建线程数量过多,超过OS所能允许的最大空间。那这个问题,完全就可以用线程池去解决,用线程池维护一定数量的线程,防止无限制的创建线程,带来的内存开销过大。

代码改进:

​ 1. 创建线程池配置类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor; /**
* @description: 线程池配置类
**/
@Configuration //表示这个类是配置类
@EnableAsync //表示这个类是线程池配置类
public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
@Bean
public Executor asyncServiceExecutor(){
logger.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
int corePoolSize = Runtime.getRuntime().availableProcessors();
//配置核心线程数
executor.setCorePoolSize(corePoolSize*2);
//配置最大线程数
executor.setMaxPoolSize(corePoolSize*2);
//配置队列大小
executor.setQueueCapacity(99999);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-service-");
// rejection-policy:当pool已经达到max size的时候,并且队列已经满了,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
//DiscardPolicy: 直接丢弃
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
//执行初始化
executor.initialize();
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
} }

2.创建线程信息打印的类,这样在执行线程池执行excute方法的时候,会把当前的任务的情况打印出来

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture; import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor; /**
* @description: 获取线程池的监控信息
**/
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class); private void showThreadPoolInfo(String prefix){
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if(null == threadPoolExecutor){
return;
}
logger.info("{},{},taskCount[{}],completedTaskCount[{}],activeCount[{}],queuesize[{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size()
);
} @Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
} @Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
} @Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
} @Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
} @Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
} @Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
} }

创建任务接口

创建任务实现类

import com.alibaba.fastjson.JSONObject;

/**
* 异步任务接口
*/
public interface AsyncService { void trfficRepalyForFlagV2(String param); } import com.alibaba.fastjson.JSONObject;
import com.lianjia.platform.sampling.util.HttpClientUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; /**
* @description: 异步任务实现类
**/
@Service
public class AsyncServiceImpl implements AsyncService {
private final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class); @Value("${URL.trafficReplayUrl}")
private String trafficReplayUrl; //获取yml文件中配置的流量回放环境的URL @Override
@Async("asyncServiceExecutor")//这里要使用定义的线程池配置的Bean的方法名
public void trfficRepalyForFlagV2(String param) {
HttpClientUtil.post(trafficReplayUrl+"sample/v2/flags", param,10000,"application/json");
} }

使用任务

@RequestMapping(value = "/flags", method = RequestMethod.POST)
@ResponseBody
public ServerResponse getFlagsPost(@RequestBody NewServerParam param) {
//如果流量回放开关打开,就创建线程,将请求发送到流量回放环境
if(Constant.TRAFFIC_REPLAY_FLAG){
asyncService.trfficRepalyForFlagV2(JSON.toJSONString(param));
}
long a = System.currentTimeMillis();
ServerResponse response = new ServerResponse();
response.setFlags(serverService.getFlags(param, param.getExp_key()));
long b = System.currentTimeMillis();
LOGGER.info(logService.parseToJSon(param, response, (b - a), LOGGER, Level.INFO.getName()));
return response;
}

压测结果:

​ 压测数据:并发数50,压测时间10min。并发数=QPS(1000)*响应时间(0.02s)。

​ 之前因为上线之前没有做压测,导致了上线之后,大流量下报错。吃一堑,长一智。这次改完之后做了压测。压测之后,第一,打开流量开关之后,不报错了;第二,主线程平均响应耗时和不开启异步任务时候的平均响应耗时基本一致。证明方案是可以的。

插曲:拒绝策略使用不当导致主线程平均响应时间非常大。第一次在写线程池配置类的时候,使用了executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());的拒绝策略。队列的大小设置了3000,在压测的时候发现并发数50(一般计算是并发数=QPS * 响应时间)左右,这个和线上单台机器的QPS基本接近,虽然不报错了主线程的接口耗时远远超出了不打开开关的接口耗时。通过打印信息来看,是因为提交的任务量非常大,队列中的任务已经把队列填满了,这个时候,从线程池原理来看,要去创建线程数达到maxpoolsize,我们这里设置的maxPoolsize和corePoolsize大小是一样的。意味着就不会再去创建线程了,只能走拒绝策略。这里的拒绝策略CallerRunsPolicy的含义是如果异步线程执行不了,就由调用线程处理,实际上就是主线程来处理,这样就会导致主线程的部分流量回放任务成了同步的了。这当然会增大主线程的接口响应时间了。因为我们只需要有部分线上流量了其实就可以了,因此,我把拒绝策略改为了直接丢弃。这样处理不了的线程不进入队列,也不由主线程执行,保证主线程的响应时间不受影响。

关于线程池与线程的个人理解

线程池是管理线程的地方。线程池分为程序中自定义的,也有数据库自己的线程池。
需要做的就是协调俩者,避免出现程序中运行的线程过多导致等待时间过长报错。
还有就是线程池的处理策略。避免出现策略选择错误导致出错。 https://blog.****.net/qiuyubo1/article/details/80288525
这篇文章是介绍异步和多线程之间的区别 异步是多线程的最终期待结果。