分布式微服务系统架构第95集:基于 Redisson 延迟队列,springboot,springcloud启动过程,策略模式

时间:2025-04-07 14:50:32
  • delayedQueueMap 使用 ConcurrentHashMap 保证线程安全。

  • 使用统一异常处理方式,方便故障排查。

优化点

描述

缓存队列对象

避免重复初始化 RBlockingDeque 和 RDelayedQueue 提升性能,支持高并发。

双重检查锁

防止多线程下并发初始化,提高线程安全性和效率。

统一日志格式

清晰易懂,便于排查问题。

异步投递接口

提升吞吐能力,适用于高并发场景。

sendIfPresent 逻辑优化

移除已存在元素后重新添加,确保更新逻辑准确。

高扩展性结构

支持多个队列名称,便于扩展不同业务队列。

线程安全本地缓存
  • 使用 ConcurrentHashMap 和 computeIfAbsent,避免并发初始化导致队列重复创建。

2. 统一序列化配置
  • 使用 JsonJacksonCodec,确保延迟消息的序列化在 Redis 分布式环境中兼容性强,支持多服务跨语言。

3. 异步发送接口优化
  • 提供 sendAsync 和 sendAsyncIfAbsent,适合高并发不阻塞场景。

4. 队列重用与延迟队列封装解耦
  • 支持动态创建多个队列,适合大规模系统中多个业务模块隔离使用。

5. 可扩展性设计
  • 后续可扩展为队列优先级、回调通知、队列消费监听等模块。

6. 兼容 Redis Cluster 架构
  • Redisson 内部会处理 Cluster 路由逻辑,避免手动分片。

/**
 * 延迟消息生产者,基于 Redisson 实现高可用、高并发的分布式延迟队列
 */
@Component
public class DelayMessageProducer {
    private static final Logger log = LoggerFactory.getLogger(DelayMessageProducer.class);

    @Autowired
    private RedissonClient redissonClient;

    // 本地缓存已创建的延迟队列,提升性能,避免重复创建
    private final Map<String, RDelayedQueue<Object>> delayedQueueMap = new ConcurrentHashMap<>(4);

    /**
     * 初始化延迟队列,使用本地缓存减少 Redisson 重复创建开销
     */
    private RDelayedQueue<Object> initDelayQueue(String queueName) {
        return delayedQueueMap.computeIfAbsent(queueName, name -> {
            RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(name, new JsonJacksonCodec());
            return redissonClient.getDelayedQueue(blockingDeque);
        });
    }
}

springboot,springcloud启动过程

1. 启动类的执行(Main方法)

Spring Boot 的启动过程从 @SpringBootApplication 注解的类的 main 方法开始。该方法调用了 SpringApplication.run() 来启动整个 Spring Boot 应用。@SpringBootApplication 注解是一个组合注解,它包含了:

  • @Configuration:表明该类是配置类。

  • @EnableAutoConfiguration:启用 Spring Boot 的自动配置。

  • @ComponentScan:启动组件扫描,扫描该类所在包及其子包中的 Bean。

2. SpringApplication.run() 方法

SpringApplication.run() 是 Spring Boot 启动过程中的关键方法。它会完成以下任务:

  • 创建并配置 SpringApplication 对象。

  • 启动嵌入式的 Web 服务器(如 Tomcat、Jetty)。

  • 初始化 Spring 容器(ApplicationContext)。

  • 执行一些其他的初始化操作,比如命令行参数解析、环境配置等。

3. 初始化 SpringApplication 对象

在执行 SpringApplication.run() 时,SpringApplication 会被初始化,它会做以下几件事:

  • 设置 ApplicationContext(默认是 AnnotationConfigApplicationContext)。

  • 设置 Banner(Banner 是 Spring Boot 启动时显示的 ASCII 字符图标)。

  • 设置 CommandLineRunner 和 ApplicationRunner 接口的 Bean,它们会在应用启动后执行。

Spring Boot 启动过程大致如下:

  1. 执行 main 方法,调用 SpringApplication.run() 启动应用。

  2. 初始化 SpringApplication,加载配置。

  3. 创建并初始化 ApplicationContext,扫描并注册 Bean。

  4. 自动配置系统根据依赖自动配置应用。

  5. 发布应用启动事件。

  6. 执行 CommandLineRunner 和 ApplicationRunner 中的代码(如果定义)。

  7. 启动完成,应用开始运行。

Spring Cloud 是基于 Spring Boot 构建的分布式系统的开发框架,旨在简化微服务架构的构建。它提供了很多用于微服务架构的解决方案,如服务发现、负载均衡、配置管理、消息驱动等。

Spring Cloud 的启动过程依赖于 Spring Boot 启动过程,但它有自己的一些额外步骤,主要用于处理服务注册与发现、配置管理、服务通信等。下面是 Spring Cloud 启动的主要过程:

1. 启动类的执行(Main 方法)

与 Spring Boot 类似,Spring Cloud 的应用通常也从一个 main 方法开始。在这个方法中,调用 SpringApplication.run() 来启动应用。@SpringCloudApplication 注解是 Spring Cloud 的启动注解,它是一个组合注解,包含了:

  • @SpringBootApplication:包含了 Spring Boot 启动的所有功能。

  • @EnableDiscoveryClient 或 @EnableEurekaClient:启用服务发现客户端,这让服务能够注册到 Eureka 或其他服务注册中心。

  • @EnableCircuitBreaker:启用熔断器机制,防止调用失败时影响其他服务。

Spring Cloud 是基于 Spring Boot 构建的分布式系统的开发框架,旨在简化微服务架构的构建。它提供了很多用于微服务架构的解决方案,如服务发现、负载均衡、配置管理、消息驱动等。

Spring Cloud 的启动过程依赖于 Spring Boot 启动过程,但它有自己的一些额外步骤,主要用于处理服务注册与发现、配置管理、服务通信等。下面是 Spring Cloud 启动的主要过程:

1. 启动类的执行(Main 方法)

与 Spring Boot 类似,Spring Cloud 的应用通常也从一个 main 方法开始。在这个方法中,调用 SpringApplication.run() 来启动应用。@SpringCloudApplication 注解是 Spring Cloud 的启动注解,它是一个组合注解,包含了:

  • @SpringBootApplication:包含了 Spring Boot 启动的所有功能。

  • @EnableDiscoveryClient 或 @EnableEurekaClient:启用服务发现客户端,这让服务能够注册到 Eureka 或其他服务注册中心。

  • @EnableCircuitBreaker:启用熔断器机制,防止调用失败时影响其他服务。

2. SpringApplication.run() 方法

SpringApplication.run() 是 Spring Boot 启动的关键方法,也同样适用于 Spring Cloud,它会执行以下步骤:

  • 创建并配置 SpringApplication 对象。

  • 启动应用的上下文(ApplicationContext)。

  • 加载并初始化 Spring Boot 自动配置。

  • 初始化服务注册中心,开始与服务发现交互。

在 Spring Cloud 中,如果应用涉及服务注册与发现(如 Eureka 或 Consul),这一步会连接到注册中心,进行服务注册。

3. 服务发现与注册

如果应用是一个服务消费者或提供者,并且启用了服务发现机制(如 Eureka),则 Spring Cloud 会:

  • 启动服务发现客户端(如 EurekaClient),并自动注册到服务发现平台。

  • 每个服务都会将其实例信息(如 IP 地址、端口等)注册到服务注册中心(如 Eureka)。

  • 服务消费者在启动时会自动从服务注册中心获取服务列表,使用负载均衡进行调用。

4. 配置管理

Spring Cloud 提供了配置中心(如 Spring Cloud Config Server),允许将应用的配置从集中式的配置服务器中获取。Spring Cloud 在启动时会尝试连接配置服务器,并加载应用的配置。

如果启用了 Spring Cloud Config,启动时会从配置服务器获取配置信息(如数据库连接信息、微服务配置等),这些信息会被自动加载到 Spring 环境中。

5. 熔断器与负载均衡

Spring Cloud 还集成了 Netflix 的一些工具,如 Hystrix(熔断器)和 Ribbon(负载均衡),帮助处理服务调用的可靠性和负载均衡。启动时,Spring Cloud 会根据配置启动这些组件,确保服务调用的容错性。

  • Hystrix:通过 @EnableCircuitBreaker 注解启用熔断器,防止调用失败时影响其他服务。

  • Ribbon:启用客户端负载均衡,服务消费者会根据 Ribbon 自动选择一台合适的服务提供者。

Spring Cloud 启动过程的主要步骤如下:

  1. 执行 main 方法,调用 SpringApplication.run() 启动应用。

  2. 初始化 Spring Boot 的核心功能,加载配置。

  3. 启动服务注册与发现,服务注册到注册中心。

  4. 加载和初始化 Spring Cloud 相关功能(如配置中心、熔断器、负载均衡等)。

  5. 执行 CommandLineRunner 和 ApplicationRunner 中的代码(如果定义)。

  6. 启动完成,应用开始接受请求。



  • 提高并发性能

    • 使用异步处理Kafka消息发送,避免在主线程中阻塞,提升性能。

    • 可以通过增加并发线程池来处理多个消息,提高吞吐量。

  • 提高高可用性

    • 如果Kafka发送失败,可以进行重试或者备用处理,以确保消息不丢失。

    • 对于消息处理,可以考虑使用队列处理和死信队列的机制,避免消息丢失和延时。

  • 优化代码逻辑

    • 代码中没有处理异常情况,我们可以增加异常处理和日志记录。

    • 可以使用缓存机制减少频繁调用的数据库操作。

解释:

  1. 异步消息处理

  • 在 handle() 方法中,使用 ExecutorService 提交异步任务来处理 Kafka 消息的发送。这样可以避免消息处理阻塞主线程,提高系统的并发处理能力。