前言
我们在日常开发中,大概率会遇到如下场景:主线程阻塞,并获取多个子线程执行任务的结果。而我们借助ThreadPoolExecutor和CountDownLatch可以高效率处理多个任务。
内容
代码示例
1.记录线程处理结果
@Data
public class CommunicationCheckResult {
private boolean mysqlCommunicationCheck;
// private boolean redisCommunicationCheck;
private boolean rabbitmqCommunicationCheck;
}
2.多线程处理
/**
* @author: coffee
* @date: 2023/3/30 12:17 PM
* @description: ...
*/
@Slf4j
@Component
public class CommunicationService {
private static final String REDIS_COMMUNICATION_CHECK_LOCK_KEY = "communication.check.key";
@Autowired
private RedissonClient redissonClient;
@Autowired
private AmqpAdmin amqpAdmin;
/**
* 功能描述:相关服务通讯网络检查
* @return result
*/
public CommunicationCheckResult communicationCheck (CommunicationCheckRequest request) {
String processId = UUID.randomUUID().toString();
log.info("processId:[{}],开始进行网络通讯检查... request:[{}]", processId, JsonUtils.toJson(request));
AssertUtil.isTrue(Objects.nonNull(request));
AssertUtil.isTrue(StringUtils.hasText(request.getUserId()), "userId is empty.");
/* 分布式锁控制 (根据业务场景考虑是否使用分布式锁) */
RLock lock = redissonClient.getLock(REDIS_COMMUNICATION_CHECK_LOCK_KEY);
if(!lock.tryLock()) {
log.info("lock conflict");
return null;
}
/* 流程处理 */
CommunicationCheckResult result = new CommunicationCheckResult();
try {
handler(processId, request, result);
} catch (Exception e) {
log.error("processId:[{}], fail", processId, e);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
log.info("processId:[{}],完成网络通讯检查... result:[{}]", processId, JsonUtils.toJson(result));
return result;
}
private void handler(String processId, CommunicationCheckRequest request, CommunicationCheckResult result)
throws InterruptedException {
/* 自定义线程池 */
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10,
20,
1L,
TimeUnit.MINUTES,
new ArrayBlockingQueue<>(1)
);
/*
注意事项:如果serverCount=3,但是只对2个服务做了处理,会有问题。 因为countDownLatch会一直释放不了。
*/
int serverCount = CommunicationCheckResult.class.getDeclaredFields().length;
CountDownLatch countDownLatch = new CountDownLatch(serverCount);
log.info("processId:[{}], 待检查服务数量:[{}]", processId, serverCount);
threadPoolExecutor.submit(()->{
try{
/* 开始检查MySQL数据库 */
checkMysql(result);
log.info("processId:[{}], 1.MySQL通讯正常", processId);
} catch (Exception e) {
result.setMysqlCommunicationCheck(false);
log.error("mysql 通讯异常", e);
} finally {
countDownLatch.countDown();
}
});
threadPoolExecutor.submit(()->{
try{
/* 开始检查rabbitmq */
checkRabbitmq(result);
log.info("processId:[{}], 3.rabbitmq通讯正常", processId);
} catch (Exception e) {
result.setMysqlCommunicationCheck(false);
log.error("rabbitmq 通讯异常", e);
} finally {
countDownLatch.countDown();
}
});
log.info("processId:[{}], 【========================主线程阻塞========================】", processId);
/* 阻塞主线程 */
countDownLatch.await();
log.info("processId:[{}], 【========================主线程结束========================】", processId);
}
private void checkRabbitmq(CommunicationCheckResult result) {
String queueName = "test-queue";
amqpAdmin.getQueueProperties(queueName);
result.setRabbitmqCommunicationCheck(true);
}
}
关键点说明
1、ThreadPoolExecutor自定义线程池:定义线程池,每调用一次submit就是提交一次任务
。
2、CountDownLatch线程计数器:用于阻塞主线程,当所有子线程执行完毕,再回到主线程
。
- 注意事项:如果定义10,那么countDownLatch.countDown()必须调用10次。只有在结束计数的时候,才会重新执行到主线程;
3、获取线程处理结果:借助Java的引用类型
存储数据,可解决多线程数据存储的问题。
代码优化
1、如果感觉一个一个的submit提交任务太麻烦,可以考虑借助枚举+Switch的方式优化:
// 定义服务枚举
ServerEnum[] array = {REDIS,RABBIT_MQ};
for (int i = 0 ; i < array.length; i++) {
switch (array[i]) {
case REDIS:
checkRedis(threadPoolExecutor, countDownLatch);
break;
default:
throw new RuntimeException();
}
}
=======================
void checkRedis(threadPoolExecutor, countDownLatch) {
threadPoolExecutor.submit(()->{
try{
/* 开始检查redis */
checkRedis(result);
log.info("processId:[{}], 2.redis通讯正常", processId);
} catch (Exception e) {
result.setRedisCommunicationCheck(false);
log.error("redis 通讯异常", e);
} finally {
countDownLatch.countDown();
}
});
}
总结
关于线程实战的更多知识,转到:【Java线程】线程篇 - 从理论到具体代码案例最全线程知识点梳理(持续更新中…) – 四、线程实战篇