delayedQueueMap
使用ConcurrentHashMap
保证线程安全。使用统一异常处理方式,方便故障排查。
优化点 |
描述 |
---|---|
缓存队列对象 | 避免重复初始化 |
双重检查锁 | 防止多线程下并发初始化,提高线程安全性和效率。 |
统一日志格式 | 清晰易懂,便于排查问题。 |
异步投递接口 | 提升吞吐能力,适用于高并发场景。 |
sendIfPresent 逻辑优化 |
移除已存在元素后重新添加,确保更新逻辑准确。 |
高扩展性结构 | 支持多个队列名称,便于扩展不同业务队列。 |
. 线程安全本地缓存
使用
ConcurrentHashMap
和computeIfAbsent
,避免并发初始化导致队列重复创建。
2. 统一序列化配置
使用
JsonJacksonCodec
,确保延迟消息的序列化在 Redis 分布式环境中兼容性强,支持多服务跨语言。
3. 异步发送接口优化
提供
sendAsync
和sendAsyncIfAbsent
,适合高并发不阻塞场景。
4. 队列重用与延迟队列封装解耦
支持动态创建多个队列,适合大规模系统中多个业务模块隔离使用。
5. 可扩展性设计
后续可扩展为队列优先级、回调通知、队列消费监听等模块。
6. 兼容 Redis Cluster 架构
Redisson 内部会处理 Cluster 路由逻辑,避免手动分片。
/**
* 延迟消息生产者,基于 Redisson 实现高可用、高并发的分布式延迟队列
*/
@Component
public class DelayMessageProducer {
private static final Logger log = LoggerFactory.getLogger(DelayMessageProducer.class);
@Autowired
private RedissonClient redissonClient;
// 本地缓存已创建的延迟队列,提升性能,避免重复创建
private final Map<String, RDelayedQueue<Object>> delayedQueueMap = new ConcurrentHashMap<>(4);
/**
* 初始化延迟队列,使用本地缓存减少 Redisson 重复创建开销
*/
private RDelayedQueue<Object> initDelayQueue(String queueName) {
return delayedQueueMap.computeIfAbsent(queueName, name -> {
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(name, new JsonJacksonCodec());
return redissonClient.getDelayedQueue(blockingDeque);
});
}
}
springboot,springcloud启动过程
1. 启动类的执行(Main方法)
Spring Boot 的启动过程从 @SpringBootApplication
注解的类的 main
方法开始。该方法调用了 SpringApplication.run()
来启动整个 Spring Boot 应用。@SpringBootApplication
注解是一个组合注解,它包含了:
@Configuration
:表明该类是配置类。@EnableAutoConfiguration
:启用 Spring Boot 的自动配置。@ComponentScan
:启动组件扫描,扫描该类所在包及其子包中的 Bean。
2. SpringApplication.run() 方法
SpringApplication.run()
是 Spring Boot 启动过程中的关键方法。它会完成以下任务:
创建并配置
SpringApplication
对象。启动嵌入式的 Web 服务器(如 Tomcat、Jetty)。
初始化 Spring 容器(ApplicationContext)。
执行一些其他的初始化操作,比如命令行参数解析、环境配置等。
3. 初始化 SpringApplication 对象
在执行 SpringApplication.run()
时,SpringApplication
会被初始化,它会做以下几件事:
设置
ApplicationContext
(默认是AnnotationConfigApplicationContext
)。设置
Banner
(Banner 是 Spring Boot 启动时显示的 ASCII 字符图标)。设置
CommandLineRunner
和ApplicationRunner
接口的 Bean,它们会在应用启动后执行。
Spring Boot 启动过程大致如下:
执行
main
方法,调用SpringApplication.run()
启动应用。初始化
SpringApplication
,加载配置。创建并初始化
ApplicationContext
,扫描并注册 Bean。自动配置系统根据依赖自动配置应用。
发布应用启动事件。
执行
CommandLineRunner
和ApplicationRunner
中的代码(如果定义)。启动完成,应用开始运行。
Spring Cloud 是基于 Spring Boot 构建的分布式系统的开发框架,旨在简化微服务架构的构建。它提供了很多用于微服务架构的解决方案,如服务发现、负载均衡、配置管理、消息驱动等。
Spring Cloud 的启动过程依赖于 Spring Boot 启动过程,但它有自己的一些额外步骤,主要用于处理服务注册与发现、配置管理、服务通信等。下面是 Spring Cloud 启动的主要过程:
1. 启动类的执行(Main 方法)
与 Spring Boot 类似,Spring Cloud 的应用通常也从一个 main
方法开始。在这个方法中,调用 SpringApplication.run()
来启动应用。@SpringCloudApplication
注解是 Spring Cloud 的启动注解,它是一个组合注解,包含了:
@SpringBootApplication
:包含了 Spring Boot 启动的所有功能。@EnableDiscoveryClient
或@EnableEurekaClient
:启用服务发现客户端,这让服务能够注册到 Eureka 或其他服务注册中心。@EnableCircuitBreaker
:启用熔断器机制,防止调用失败时影响其他服务。
Spring Cloud 是基于 Spring Boot 构建的分布式系统的开发框架,旨在简化微服务架构的构建。它提供了很多用于微服务架构的解决方案,如服务发现、负载均衡、配置管理、消息驱动等。
Spring Cloud 的启动过程依赖于 Spring Boot 启动过程,但它有自己的一些额外步骤,主要用于处理服务注册与发现、配置管理、服务通信等。下面是 Spring Cloud 启动的主要过程:
1. 启动类的执行(Main 方法)
与 Spring Boot 类似,Spring Cloud 的应用通常也从一个 main
方法开始。在这个方法中,调用 SpringApplication.run()
来启动应用。@SpringCloudApplication
注解是 Spring Cloud 的启动注解,它是一个组合注解,包含了:
@SpringBootApplication
:包含了 Spring Boot 启动的所有功能。@EnableDiscoveryClient
或@EnableEurekaClient
:启用服务发现客户端,这让服务能够注册到 Eureka 或其他服务注册中心。@EnableCircuitBreaker
:启用熔断器机制,防止调用失败时影响其他服务。
2. SpringApplication.run() 方法
SpringApplication.run()
是 Spring Boot 启动的关键方法,也同样适用于 Spring Cloud,它会执行以下步骤:
创建并配置
SpringApplication
对象。启动应用的上下文(
ApplicationContext
)。加载并初始化 Spring Boot 自动配置。
初始化服务注册中心,开始与服务发现交互。
在 Spring Cloud 中,如果应用涉及服务注册与发现(如 Eureka 或 Consul),这一步会连接到注册中心,进行服务注册。
3. 服务发现与注册
如果应用是一个服务消费者或提供者,并且启用了服务发现机制(如 Eureka),则 Spring Cloud 会:
启动服务发现客户端(如
EurekaClient
),并自动注册到服务发现平台。每个服务都会将其实例信息(如 IP 地址、端口等)注册到服务注册中心(如 Eureka)。
服务消费者在启动时会自动从服务注册中心获取服务列表,使用负载均衡进行调用。
4. 配置管理
Spring Cloud 提供了配置中心(如 Spring Cloud Config Server),允许将应用的配置从集中式的配置服务器中获取。Spring Cloud 在启动时会尝试连接配置服务器,并加载应用的配置。
如果启用了 Spring Cloud Config,启动时会从配置服务器获取配置信息(如数据库连接信息、微服务配置等),这些信息会被自动加载到 Spring 环境中。
5. 熔断器与负载均衡
Spring Cloud 还集成了 Netflix 的一些工具,如 Hystrix(熔断器)和 Ribbon(负载均衡),帮助处理服务调用的可靠性和负载均衡。启动时,Spring Cloud 会根据配置启动这些组件,确保服务调用的容错性。
Hystrix:通过
@EnableCircuitBreaker
注解启用熔断器,防止调用失败时影响其他服务。Ribbon:启用客户端负载均衡,服务消费者会根据 Ribbon 自动选择一台合适的服务提供者。
Spring Cloud 启动过程的主要步骤如下:
执行
main
方法,调用SpringApplication.run()
启动应用。初始化 Spring Boot 的核心功能,加载配置。
启动服务注册与发现,服务注册到注册中心。
加载和初始化 Spring Cloud 相关功能(如配置中心、熔断器、负载均衡等)。
执行
CommandLineRunner
和ApplicationRunner
中的代码(如果定义)。启动完成,应用开始接受请求。
提高并发性能:
使用异步处理Kafka消息发送,避免在主线程中阻塞,提升性能。
可以通过增加并发线程池来处理多个消息,提高吞吐量。
提高高可用性:
如果Kafka发送失败,可以进行重试或者备用处理,以确保消息不丢失。
对于消息处理,可以考虑使用队列处理和死信队列的机制,避免消息丢失和延时。
优化代码逻辑:
代码中没有处理异常情况,我们可以增加异常处理和日志记录。
可以使用缓存机制减少频繁调用的数据库操作。
解释:
异步消息处理:
在
handle()
方法中,使用ExecutorService
提交异步任务来处理 Kafka 消息的发送。这样可以避免消息处理阻塞主线程,提高系统的并发处理能力。