IntelliJ+SpringBoot项目实战(23)--整合RabbitMQ

时间:2024-12-04 12:48:50

 一、前言

        在上节课中介绍了Quartz定时作业实现定时任务处理。但是在实际的项目中,很多业务不能通过定时轮询的方式,比如订单下单付款了,通过定时去扫付款的订单去做后续业务处理的话,会非常影响性能。这个时候我们需要引入消息中间件产品进行异步处理,比如订单付款后,在付款的后台业务代码中产生一个异步消息通知,然后订阅消息的业务监听组件收到消息后,异步进行后续的处理。

        关于消息中间件产品有很多,目前主要有RabbitMQ、RocketMQ、Kafka等,这三者有什么差异呢?

RabbitMQ、RocketMQ和Kafka都是流行的开源消息队列系统,每个系统都有自己的特点和用途。

  1. RabbitMQ:

    高度可靠,支持AMQP(高级消息队列协议);用于系统间的异步通信和数据分发。支持消息持久化和高可用性。

  2. RocketMQ:

    设计为高可用和高并发的消息中间件;支持分布式事务消息和顺序消息。阿里巴巴的开源项目,广泛应用于金融行业。

  3. Kafka:设计为高吞吐量的分布式消息系统。适用于大数据场景下的实时数据处理。支持数据持久化和复制。

    对比RabbitMQ、RocketMQ和Kafka的特性,可以从以下几个方面进行考虑:

  • 可靠性:RabbitMQ和RocketMQ支持持久化消息,Kafka默认将消息持久化到磁盘。

  • 吞吐量:Kafka具有很高的吞吐量,能够处理大量的数据。

  • 延迟:Kafka具有较低的延迟,适合处理需要实时处理的数据。

  • 分布式:RocketMQ和Kafka支持分布式部署,可以横向扩展。

  • 开源协议:RabbitMQ支持AMQP,RocketMQ支持JMS,Kafka支持自有的TCP协议。

  • 成本:RabbitMQ和Kafka可能需要较高的硬件成本,RocketMQ可能更容易管理。

  • 社区支持:RabbitMQ和RocketMQ有较大的社区支持和商业支持。

        在选择消息队列时,需要根据具体的应用场景和需求来决定。例如,对于需要高吞吐量和低延迟的实时数据处理,Kafka可能是更好的选择;而对于需要高可靠性和复杂消息队列功能的场景,RabbitMQ或许是更合适的。

        因为RabbitMQ具有高的可靠性,所以我们在做业务处理的时候,可以选择RabbitMQ作为业务处理的中间件。下面介绍RabbitMQ的开发过程。

二、引入RabbitMQ依赖

        在openjweb-starter下的mq-openjweb-starter中引入RabbitMQ依赖:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        然后在openjweb-sys中引入依赖mq-openjweb-starter:

        <dependency>
            <groupId>org.openjweb</groupId>
            <artifactId>mq-openjweb-starter</artifactId>
            <version>0.0.1-SNAPSHOT</version>
            <scope>compile</scope>

        </dependency>

三、RabbitMQ消息队列开发       

        现在我们需要定义关于rabbitMQ的几个配置参数,在openjweb-sys中的application-dev.xml中增加下面的配置:

mq:
  rabbit:
    namePrefix: openjweb-cloud
    host: 127.0.0.1:5672
    username: guest
    password: guest

        上面参数中,namePrefix是定义MQ交换机、队列的名称前缀,host是rabbitMQ主机的ip和端口,username和password 分别是rabbitMQ登录用户和密码。

        然后我们在mq-openjweb-starter中定义一个属性类,用于加载上面的配置:

package org.openjweb.mq.starter.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@ConfigurationProperties(prefix = "mq.rabbit")
@Data
@Component
public class DefaultMqProperties {
    /**
     * rabbitMq 名称前缀(交换机、队列名称)
     */
    private String namePrefix ;

    /**
     * mq host
     */
    private String host  ;

    /**
     * mq 用户名
     */
    private String username  ;

    /**
     * mq 密码
     */
    private String password ;
}

        然后我们在mq-openjweb-starter中定义一个RabbitMQ的自动配置类RabbitMqAutoConfig:

package org.openjweb.mq.starter.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(DefaultMqProperties.class)
public class RabbitMqAutoConfig {

    @Autowired DefaultMqProperties defaultMqProperties;

    private CachingConnectionFactory connectionFactory;

    /**
     * 连接工厂使用自定义配置
     *
     * @param connectionFactory  connectionFactory
     * @param defaultMqProperties starhumanMqProperties
     */
    public RabbitMqAutoConfig(CachingConnectionFactory connectionFactory,
                              DefaultMqProperties defaultMqProperties) {
        connectionFactory.setUsername(defaultMqProperties.getUsername());
        connectionFactory.setPassword(defaultMqProperties.getPassword());
        connectionFactory.setAddresses(defaultMqProperties.getHost());
        connectionFactory.afterPropertiesSet();
        this.connectionFactory = connectionFactory;
        System.setProperty("spring.amqp.deserialization.trust.all","true");
        //更新常量

    }

  

    /**
     * 交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(  defaultMqProperties.getNamePrefix()+"_exchange", true, false);
    }

    /**
     * 延迟队列交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange lazyExchange() {
        DirectExchange directExchange = new DirectExchange(defaultMqProperties.getNamePrefix()+"_lazy_exchange",  true, false);
        directExchange.setDelayed(true);
        return directExchange;
    }

    /**
     * admin操作日志队列
     *
     * @return Queue
     */
    @Bean
    public Queue adminLogQueue() {
        return new Queue("LOG_QUEUE", true);
    }

    @Bean
    public Binding adminLogBinding() {
        return BindingBuilder.bind(adminLogQueue()).to(directExchange()).with("LOG_QUEUE");
    }
    /**
     * 连接工厂,单一消费者,发生异常丢弃消息
     */
    @Bean("factory_single_pass_err")
    public SimpleRabbitListenerContainerFactory starhumanMqFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setBatchSize(1);
        //跳过异常
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        return factory;
    }

}

        代码说明:

        在上面的代码中,定义了2个交换机,directExchange()定义的交换机命名为openjweb_cloud_exchange,lazyExchange定义了一个交换机命名为openjweb_cloud_lazy_exchange,但是第二个设置了延迟加载为true,是延迟加载交换机,延迟加载需要使用延迟加载组件,后面介绍RabbitMQ安装的时候会介绍这个插件安装。

        在业务开发的时候,如果定义多个消息队列,比如订单下单成功消息、支付成功消息队列等。步骤是,首先增在这个类中增加一个类型为Queue的Bean:

    @Bean
    public Queue adminLogQueue() {
        return new Queue("LOG_QUEUE", true);
    }

        LOG_QUEUE 是自定义的消息队列的名字。

        第二步是将这个Bean及对应的消息队列绑定到一个交换机:

    @Bean
    public Binding adminLogBinding() {
        return BindingBuilder.bind(adminLogQueue()).to(directExchange()).with("LOG_QUEUE");
    }

        注意上面的写法。然后我们在openjweb-sys的模块中增加一个测试接口,用于产生消息:

package org.openjweb.sys.api;

import lombok.extern.slf4j.Slf4j;
import org.openjweb.mq.starter.config.DefaultMqProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * http://localhost:8001/demo/api/mq/test
 */
@Slf4j
@RequestMapping("/demo/api/mq")
@RestController
public class RabbitMQDemoApi {



    @Autowired
    DefaultMqProperties defaultMqProperties;
    @RequestMapping("test")
    public String test(){
        String host = defaultMqProperties.getHost();//读取成功
        return host;

    }

    @Resource
    private RabbitTemplate rabbitTemplate;

    //http://localhost:8001/demo/api/mq/newMsg
    //http://localhost:15672/#/

    @RequestMapping("newMsg")
    public String newMsg(){
        rabbitTemplate.convertAndSend( "openjweb-cloud_exchange", "LOG_QUEUE", "hello,i'm msg");
        //String host = defaultMqProperties.getHost();//读取成功
        return "success";

    }
}

        在上面的代码中,newMsg方法是向交换机 openjweb-cloud_exchange的LOG_QUEUE消息队列发送一个hello,i'm msg的消息,注意最后这个hello,i'm msg这个字符串参数也可以替换成其他的可序列化的对象,例如JSON,或者业务类的实例。

        另外我们再开发一个消息监听类,在openjweb-sys中实现一个消息监听类MqLogConsumer:

package org.openjweb.sys.mq;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * mq处理日志信息
 *
 * @author xd
 */
@Slf4j
@Component
public class MqLogConsumer {

    @RabbitListener(queues = "LOG_QUEUE", containerFactory ="factory_single_pass_err")
    public void adminLogBindingConsumer(String msg) {
        try {
            log.info("接收日志消息::::::");
            log.info(msg);
        } catch (Exception e) {
            //修改拼团活动团队状态失败

        }
    }
}

        运行测试:http://localhost:8001/demo/api/mq/newMsg 

        控制台输出信息:

        这样RabbitMQ的例子就运行成功,大家可以试着再参考上面的示例再增加一个消息队列Bean,另外发送消息的时候可以发送一个类实例来替代字符串。下面是RabbitMQ后台消息队列:

        下面是RabbitMQ后台的交换机:

四、RabbitMQ安装步骤 

        Linux环境中安装RabbitMQ需要按照下面的步骤:

(1)首先安装erlang。

(2)安装rabbitMQ。

(3)安装RabbitMQ的延迟加载组件。

        注意三者的版本要匹配。

        RabbitMQ的配置(vi /etc/rabbitmq/rabbitmq.config 写入以下配置信息

[

{rabbit, [

{ tcp_listeners, [ 5672 ] }, { ssl_listeners, [ ] },

{default_user, <<"guest">>}, {default_pass, <<"guest">>}, {loopback_users, []},

{auth_mechanisms, ['EXTERNAL']} ]

},

{ rabbitmq_management, [ { listener, [

{ port, 15672 }, { ssl, false }

]

}

] }

].

        在上面设置了rabbitMQ 的监听端口为5672,管理后台的端口为15672,创建的默认用户名和密码都是guest。

        网上下载RabbitMQ匹配版本的延迟加载组件,其后缀是.ez,放到RabbitMQ主目录的plugins下,然后启动RabbitMQ:

        systemctl start rabbitmq-server  

        systemctl enable rabbitmq-server

        在Windows环境下安装RabbitMQ,一是可以使用hyper-V虚拟机管理软件安装Linux虚拟机,另外就是下载Windows版的RabbitMQ,Windows版本的RabbitMQ更新的不是很及时,可以使用3.7.10版本的。网上找的rabbitmq_delayed_message_exchange-3.8.0.ez 延迟加载组件可以在3.7.10下使用。erlang版本对应的是21。

        网上下载延迟加载组件的地址:

Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

3.7系列的RabbitMQ使用下面的插件:

        下载后,放到rabbitMQ安装目录的plugins目录下,如C:\rabbitmq\rabbitmq_server-3.7.10\plugins(实际目录根据本地安装情况做下修改)。

        RabbitMQ安装后,以管理员的身份运行下面的命令:

  • 输入指令激活插件:rabbitmq-plugins.bat enable rabbitmq_management
  • 延迟插件rabbitmq_delayed_message_exchange-3.8.0.ez复制到plugins目录,然后运行:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

  • rabbitmq_server-3.7.10\etc目录将rabbitmq.config.example改为rabbitmq.config,然后找到

去掉标红处的%%,就是启用配置,配置了guest用户和guest密码和default_vhost主机/

  •  重启服务器:net stop RabbitMQ && net start RabbitMQ

若出现服务启动之后闪退的情况,一般是因为启动了ActiveMq,导致Java.exe的程序占用了5672端口。关闭ActiveMq,解决此问题。

此时也可到服务中查看RabbitMQ,手动重启

若还未解决,可以查看报错日志C:\Users\Administrator\AppData\Roaming\RabbitMQ\log具体分析

登录验证:

后台登录地址:http://localhost:15672/

 

默认账户: guest  密码:guest

  • 由于账号guest具有所有的操作权限,并且又是默认账号,出于安全因素的考虑,guest用户只能通过localhost登陆使用

    将ebin目录下rabbit.app中loopback_users里的<<"guest">>删除,或者在配置文件rabbitmq.config中对该项进行配置,用以解决上述问题。

也可以新建用户管理 rabbitmq,常见用户管理命令:

  • 新增用户   rabbitmqctl  add_user  Username  Password
  • 删除用户  rabbitmqctl  delete_user  Username
  • 修改用户密码  rabbitmqctl  change_password  Username  Newpassword
  • 查看当前用户列表 rabbitmqctl  list_users

本示例完整代码可从github下载:

GitHub - openjweb/cloud at masterOpenJWeb is a java bases low code platform. Contribute to openjweb/cloud development by creating an account on GitHub.https://github.com/openjweb/cloud/tree/master