消息服务概述
为什么要使用消息服务
在多数应用尤其是分布式系统中,消息服务是不可或缺的重要部分,它使用起来比较简单,同时解决了不少难题,例如异步处理、应用解耦、流量削峰、分布式事务管理等,使用消息服务可以实现一个高性能、高可用、高拓展的系统。下面我们使用实际开发中的若干场景来分析和说明为什么要使用消息服务,以及使用消息服务的好处。
异步处理
场景说明:用户注册后,系统需要将信息写入数据库,并发送注册邮件和注册短信通知
在图8-1中,针对上述注册业务的场景需求,处理方法有3种。
1)串行处理方式:用户发送注册请求后,服务器会先将注册信息写入数据库,依次发送注册邮件和短信消息,服务器只有在消息处理完毕后才会将处理结果返回客户端。这种串行处理消息的方式非常耗时,用户体验不友好。
2)并行处理方式:用户发送注册请求后,将注册信息写入数据库,同时发送注册邮件和短信,最后返回给客户端,这种并行处理的方式在一定程度上提高了后台业务处理的效率,但如果遇到较为耗时的业务处理,仍然显得不够完善。
3)消息服务处理方式:可以在业务中嵌入消息服务进行业务处理,这种方式先将注册信息写入数据库,在极短的时间内将注册信息写入消息队列后即可返回响应信息。此时前端业务不需要理会不相干的后台业务处理,而发送注册邮件和短息的业务会自动读取消息队列中的相关信息进行后续业务处理。
应用解耦
场景说明:用户下单后,订单服务需要通知库存服务。
如果使用传统方式处理订单业务,用户下单后,订单服务会直接调用库存服务接口进行库存更新,这种方式有一个很大的问题是:一旦库存系统出现异常,订单服务会失败导致订单丢失。如果使用消息服务模式,订单服务的下订单消息会快速写入消息队列,库存服务会监听并读取到订单,从而修改库存。相较于传统方式,消息服务模式显得更加高效、可靠。
流量削峰
场景说明:秒杀活动是流量削峰的一种应用场景,由于服务器处理资源能力有限,因此出现峰值时很容易造成服务器宕机、用户无法访问的情况。为了解决这个问题,通常会采用消息队列缓冲瞬时高峰流量,对请求进行分层过滤,从而过滤掉一些请求。
针对上述秒杀业务的场景需求,如果专门增设服务器来应对秒杀活动期间的请求瞬时高峰的话,在非秒杀活动期间,这些多余的服务器和配置显得有些浪费;如果不进行有效处理的话,秒杀活动瞬时高峰流量请求有可能压垮服务,因此,在秒杀活动中加入消息服务是较为理想的解决方案。通过在应用前端加入消息服务,先将所有请求写入到消息队列,并限定一定的阈值,多余的请求直接返回秒杀失败,秒杀服务会根据秒杀规则从消息队列中读取并处理有限的秒杀请求。
分布式事务管理
场景说明:在分布式系统中,分布式事务是开发中必须要面对的技术难题,怎样保证分布式系统的请求业务处理的数据一致性通常是要重点考虑的问题。针对这种分布式事务管理的情况,目前较为可靠的处理方式是基于消息队列的二次提交,在失败的情况可以进行多次尝试,或者基于队列数据进行回滚操作。因此,在分布式系统中加入消息服务是一个既能保证性能不变,又能保证业务一致性的方案。
针对上述分布式事务管理的场景需求,如果使用传统方式在订单系统中写入订单支付成功信息后,再远程调用库存系统进行库存更新,一旦库存系统异常,很有可能导致库存更新失败而订单支付成功的情况,从而导致数据不一致。针对这种分布式系统的事务管理,通常会在分布式系统之间加入消息服务进行管理。订单支付成功后,写入消息表;然后定时扫描消息表消息写入到消息队列中,库存系统会立即读取消息队列中的消息进行库存更新,同时添加消息处理状态;接着,库存系统向消息队列中写入库存处理结果,订单系统会立即读取消息队列中的库存处理状态。接着,库存系统向消息队列中写入库存处理结果,订单系统会立即读取消息队列中的库存处理状态。如果库存服务处理失败,订单服务还会重复扫描并发送消息表中的消息,让库存系统进行最终一致性的库存更新。如果处理成功,订单服务直接删除消息表数据,并写入到历史消息表。
常用消息中间件介绍
消息队列中间件(简称消息中间件)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。目前开源的消息中间件有很多。
ActiveMQ
ActiveMQ是Apache公司出品的,采用Java语言编写的、完全基于JMS规范(Java Message Service)的、面向消息的中间件,它为应用程序提供高效、可拓展的、稳定的、安全的企业级消息通信。ActiveMQ丰富的API和多种集群构建模式使得它成为业界老牌的消息中间件,广泛应用于中小型企业中。相较于后续出现的RabbitMQ、RocketMQ、Kafka等消息中间件来说,ActiveMQ性能相对较弱,在如今的高并发、大数据处理的场景下显得力不从心,经常会出现一些问题,例如消息延迟、堆积、堵塞等。
RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议(Advanced Message Queuing Protocol)实现。AMQP是为应对大规模并发活动而提供统一消息服务的应用层标准高级消息队列协议,专门为面向消息的中间件设计,该协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。正是基于AMQP协议的各种优势性能,使得RabbitMQ消息中间件在应用开发中越来越受欢迎。
Kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台,它是一种高吞吐量的分布式发布订阅消息系统,采用Scala和Java语言编写,提供了快速、可拓展的、分布式的、分区的和可复制的日志订阅服务,其主要特定是追求高吞吐量,适用于产生大量数据的互联网服务的数据收集业务。
RocketMQ
RocketMQ是阿里巴巴公司开源产品,目前也是Apache公司的*项目,使用纯Java开发,具有高吞吐量、高可用、适合大规模分布式系统应用的特点。RocketMQ的思路起源于Kafka,对消息的可靠传输以及事务性做了优化,目前在阿里巴巴中被广泛应用于交易、充值、流计算、消息推送、日志流式处理场景,不过维护上稍微麻烦。
在实际项目技术选型时,在没有特别要求的场景下,通常会选择使用RabbitMQ作为消息中间件,如果针对的是大数据业务,推荐使用Kafka或者是RocketMQ作为消息中间件。
RabbitMQ消息中间件
RabbitMQ简介
RabbitMQ是基于AMQP协议的轻量级、可靠、可伸缩和可移植的消息代理,Spring使用RabbitMQ通过AMQP协议进行通信;在Spring Boot中对RabbitMQ进行了集成管理。
在所有的消息服务中,消息中间件都会作为一个第三方消息代理,接收发布者发布的消息,并推送给消息消费者。不同消息中间件内部转换消息的细节不同。
RabbitMQ的消息代理流程中有很多细节内容和内部组件,这里不必会组件的具体作用,先对整个流程梳理一遍。
- 消息发布者(Publisher,简称P)向RabbitMQ代理(Broker)指定的虚拟主机服务器(Virtual Host)发送消息。
- 虚拟主机服务器内部的交换器(Exchange,简称X)接收消息,并将消息传递并存储到与之绑定(Binding)的消息队列(Queue)中。
- 消息消费者(Consumer,简称C)通过一定的网络连接(Connection)与消息代理建立连接,同时为了简化开支,在连接内部使用了多路复用的信道进行消息的最终消费。
RabbitMQ工作模式介绍
RabbitMQ消息中间件针对不同的服务需求,提供了多种工作模式。
Work queues(工作队列模式)
在Work queues工作模式中,不需要设置交换器(RabbitMQ会使用内部默认交换器进行消息转换),需要指定唯一的消息队列进行消息传递,并且开源由多个消息消费者。在这种模式下,多个消息消费者通过轮询的方式依次接收消息队列中存储的消息,一旦消息被某个消费者接收,消息队列会将消息移除,而接收并处理消息的消费者必须在消费完一条消息后再准备接收下一条消息。
从上面的分析可以发现,Work queues工作模式适用于那些较为繁重,并且可以进行拆分处理的业务,这种情况下可以分派给多个消费者轮流处理业务。
Public/Subscribe(发布订阅模式)
在Public/Subscribe工作模式中,必须先配置一个fanout类型的交换器,不需要指定对应的路由键(Routing key),同时会将消息路由到每一个消息队列上,然后每个消息队列都可以对相同的消息进行接收存储,进而由各自消息队列关联的消费者进行消费。
从上面的分析可以发现,Publish/Subscribe工作模式适用于进行相同业务功能处理的场合。例如,用户注册成功后,需要同时发送邮件通知和短信通知,那么邮件服务消费者和短信服务消费者需要共同消费"用户注册成功"这一条消息。
Routing(路由模式)
在Routing工作模式中,必须先配置一个direct类型的交换器,并指定不同的路由键值(Routing key)将对应的消息从交换器路由到不同的消息队列进行存储,由消费者进行各自消费。
从上面的分析可以发现,Routing工作模式适用于进行不同类型消息分类处理的场合。例如,日志收集处理,用户可以配置不同的路由键值分别对不同级别的日志信息进行分类处理。
Topics(通配符模式)
在Topics工作模式中,必须先配置一个topic类型的交换器,并指定不同的路由键值(Routing key)将对应的消息从交换器路由到不同的消息队列进行存储,然后由消费者进行各自消费。Topics模式与Routing模式的主要在于:Topics模式设置的路由键是包括通配符的,其中,#匹配多个字符,*匹配一个字符,然后与其他字符一起使用.进行连接,从而组成动态路由键,在发送消息时可以根据需求设置不同的路由键,从而将消息路由到不同的消息队列。
通常情况下,Topics工作模式适用于根据不同需求动态传递处理业务的场合。例如一些订阅客户只接收邮件消息,一些订阅客户只接收短信消息,那么可以根据客户需求进行动态路由匹配,从而将订阅消息分发到不同的消息队列中。
RPC
RPC工作模式与Work queues工作模式主体流程相似,都需要设置交换器,需要指定唯一的消息队列进行消息传递。RPC模式与Work queues模式的主要不同在于:RPC模式是一个回环结构,主要针对分布式架构的消息传递。RPC模式与Work queues模式的主要不同在于:RPC模式是一个回环结构,主要针对分布式架构的消息传递业务,客户端Cilent先发送消息到消息队列,远程服务端Server获取消息,然后再写入另一个消息队列,向原始客户端Client相应消息处理结果。
RPC工作模式适用于远程服务调用的业务处理场合。例如,在分布式架构中必须考虑的分布式事务管理问题。
Headers
Headers工作模式在RabbitMQ所支持的工作模式中是较为少用的一种模式,其主体流程与Routing工作模式有些相似。不过,使用Headers工作模式时,必须设置一个headers类型的交换器,而不需要设置路由键,取而代之的是在Properties属性配置中的headers头信息中使用key/value的形式配置路由规则。由于Headers工作模式使用较少,官方文档也灭有详细说明。
上面的6中工作模式,有些可以嵌套使用,例如,在发布订阅模式中加入工作队列模式。其中Publish/Subscribe、Routing、Topics和RPC模式是开发中较为常用的工作模式。
RabbitMQ安装以及整合环境搭建
安装RabbitMQ
在使用RabbitMQ之前必须预先安装配置,参考RabbitMQ官网说明,RabbitMQ支持多平台安装,例如Linux、Windows、MacOS、Docker等。这里,我们为了方便开发使用Windows环境为例,介绍RabbitMQ的安装配置。
下载RabbitMQ
链接:https://pan.baidu.com/s/1REAC7btmaR7a-pLKfLGJqA
提取码:1234
在安装RabbitMQ之前需要Erlang语言包支持。
安装RabbitMQ
RabbitMQ安装包依赖于Erlang语言包的支持,所以需要先安装Erlang语言包,在安装RabbitMQ安装包。RabbitMQ安装包和Erlang语言包的安装都非常简单。(需要注意的是,安装Erlang语言包,必须以管理员的身份进行安装)。
在Windos环境下首先执行RabbitMQ的安装,系统环境变量中会自动增加一个变量名为ERLANG_HOME的变量配置,它的配置路径是Erlang选择安装的具体路径,无须手动修改,同时,RabbitMQ服务也会自动启动。如果是多次卸载安装的RabbitMQ,需要保证ERLANG_HOME环境的配置正确,同时保证RabbitMQ服务正常启动。
RabbitMQ可视化效果展示
查看开启服务
rabbitmq-plugins.bat list
可以看到managerment服务没有开启
开启可视化服务
rabbitmq-plugins enable rabbitmq_management
重启rabbitmqctl
rabbitmqctl.bat start_app
RabbitMQ默认提供了两个端口号5672和15672,其中5672作为服务端口号,15672用作可视化管理端口号。在浏览器*问http://localhost:15672通过可视化的方式查看RabbitMQ。
首次登录RabbitMQ可视化管理页面时需要进行用户登录,RabbitMQ安装过程中默认提供了用户名和密码均为guest的用户,可以使用该账户进行登录。登陆成功后会进入RabbitMQ可视化管理页面得首页。
RabbitMQ可视化管理页面中,显示出了RabbitMQ的版本、用户信息等信息,同时页面还包括Connections、Channeis、Exchanges、Queues、Admin在内的管理面板。
Spring Boot整合RabbitMQ环境搭建
完成RabbitMQ的安装后,下面我们开始对Spring Boot整合RabbitMQ实现消息服务需要的整合环境进行搭建。
1)创建Spring Boot项目。使用Spring Intializr方式创建一个名为chapter08的Spring Boot项目,在Dependercies依赖选择中选择Web模块中Web依赖以及Integertion模块中的RabbitMQ依赖。
2)编写配置文件,连接RabbitMQ服务。打开创建项目时自动生成的application.properties全局配置文件,在该文件中编写RabbitMQ服务对应的连接配置。
application.properties
#配置RabbitMQ消息中间件连接配置 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #配置RabbitMQ虚拟主机路径/,默认可以省略 spring.rabbitmq.virtual-host=/
需要强调的是,在上述项目全局配置文件application.properties中,编写了外部RabbitMQ消息中间件的连接配置,这样在进行整合消息服务时,使用的都是我们自己安装配置的RabbitMQ服务。而在Spring Boot中,也集成了一个内部默认的RabbitMQ中间件,如果我们没有在配置文件中配置外部RabbtiMQ连接,会启用内部的RabbitMQ中间件,这种内部RabbitMQ中间件是不推荐使用的。
Spring Boot与RabbitMQ整合实现
Publish/Subscribe(发布订阅模式)
Spring Boot整合RabbitMQ中间件实现消息服务,注意围绕3个部分的工作进行展开:定制中间件、消息发送者发送消息、消息消费者接收消息。其中。定制中间件是比较麻烦的工作,且必须预先定制。下面我们以用户注册成功后同时发送邮件通知和短信通知这一场景为例,分别使用基于API、基于配置类和基于注解这3种方式实现Publish/Subscribe工作模式的整合。
基于API的方式
基于API的方式注意讲的是使用Spring框架提供的API管理类AmqpAdmin定制消息发送组件,并进行消息发送。这种定制消息发送组件的方式与RabbitMQ可视化界面上通过对应面板进行组件操作的实现基本一样,都是通过管理员的身份,预先手动声明交换器、队列、路由键等,然后组装消息队列供应用程序调用,从而实现消息服务。
1)使用AmqpAdmin定制消息发送组件
打开chapter08项目的测试类Chapter08ApplicationTests,在该测试类中先引入AmqpAdmin管理类定制Publish/Subscribe工作模式所需的消息组件。
Chapter08ApplicationTests.java
package com.example.chapter08; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest class Chapter08ApplicationTests { @Autowired private AmqpAdmin amqpAdmin; @Test public void amqpAdmin() { //1.定义fanout类型的交换器 amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange")); //2.定义两个默认持久化队列,分别处理email和sms amqpAdmin.declareQueue(new Queue("fanout_queue_email")); amqpAdmin.declareQueue(new Queue("fanout_queue_sms")); //3.将队列分别与交换器进行绑定 amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); } }
使用Spring框架提供的消息管理组件AmqpAdmin定制了消息组件。其中amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));定义了一个fanout类型的交换器fanout_exchange。amqpAdmin.declareQueue(new Queue("fanout_queue_email")); amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));定义了两个消息队列fanout_queue_email和fanout_queue_sms,分别用来处理邮件信息和短信信息。
amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null));将定义的两个队列分别与交换器绑定。
执行上述单元测试方法amqpAdmin(),验证RabbitMQ消息组件的定制效果。
执行成功后,通过RabbitMQ可视化管理页面的Exchanges面板查看效果。
通过上述操作可以发现,在管理页面中提供了消息组件交换器、队列的定制功能。在程序中使用Spring框架提供的管理员API组件AmqpAdmin定制消息组件和管理页面上手动定制消息组件的本质是一样的。
2)消息发送者发送消息
完成消息组件的定制工作后,创建消息发送者发送消息到消息队列中。发送消息时,借助一个实体类传递消息,需要预先创建一个实体类对象。
首先,在chapter08项目中创建名为com.example.chapter.domain的包,并在该包下创建一个实体类User。
package com.example.chapter08.domain; public class User { private Integer id; private String username; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } @Override public String toString() { return "User{" + "id=" + id + ", username='" + username + '\'' + '}'; } }
其次,在项目测试类Chapter08ApplicationTests中使用Spring框架提供的RabbitTemplate模板类是实现消息发送。
@Autowired private RabbitTemplate rabbitTemplate; @Test public void psubPublisher(){ User user=new User(); user.setId(1); user.setUsername("石头"); rabbitTemplate.convertAndSend("fanout_exchange","",user); }
上述代码中,先使用@Autowired注解引入了进行消息中间件管理的RabbitTemplate组件对象,然后使用该模板工具类的convertAndSend(String exchange, String routingKey, Object object)方法进行消息发布。其中,第一个参数表示发送消息的交换器,这个参数值要与之前定制的交换器名称一致;第二个参数表示路由键,因为实现的是Public/Subscribe工作模式,所以不需要指定;第3个参数是发送的消息内容,接收Object类型。
然后,执行测试方法。
显示消息发送过程中默认使用了SimpleMessageConverter转换器进行消息转换存储,该转换器只支持字符串或实体对象序列化后的消息。而测试类中发送的是User实体类对象消息,所以发生异常。
解决方法
执行JDK自带的Serializable序列化接口定制其他类型的消息转换器。
两种方法都可行,但是相对于第二种实现方式而言,第一种方式实现后的可视化效果较差,转换后的消息无法辨别,所以一般使用第二种方式。
在chapter08项目中创建名为com.example.chapter08.config的包,并在该包下创建一个RabbitMQ消息配置类RabbitMQConfig。
package com.example.chapter08.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
创建一个RabbitMQ消息配置类RabbitMQConfig,并在该配置类中通过@Bean注解自定义一个Jackson2JsonMessageConverter类型的消息转换器组件,该组件的返回值必须为MessageConverter类型。
再次执行psubPublisher()方法,该方法执行成功后,查看可视化界面
3)消息消费者接收消息
在chapter08项目中创建名为com.example.chapter08.service的包,并在该包下创建一个针对RabbitMQ消息中间件进行消息接收和处理的业务类RabbitMQService。
package com.example.chapter08.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitMQService { /* * Publish/Subscribe工作模式接收,处理邮件业务 * */ @RabbitListener(queues = "fanout_queue_email") public void psubConsumerEmail(Message message){ byte[] body=message.getBody(); String s=new String(body); System.out.println("邮件业务接收到消息:" + s); } /* * Publish/Subscribe工作模式接收,处理短信业务 * */ @RabbitListener(queues = "fanout_queue_sms") public void psubConsumerSms(Message message){ byte[] body=message.getBody(); String s=new String(body); System.out.println("短信业务接收到消息:" + s); } }
创建一个接受处理RabbitMQ消息的业务处理类RabbitMQService,在该类中使用Spring框架提供的@RabbitListener注解监听队列名称为fanout_queue_email和fanout_queue_sms的消息,监听这两个队列是前面指定发送并存储消息的消息队列。
需要说明的是,使用@RabbitListener注解监听队列消息后,一旦服务启动且监听到指定的队列有消息存在(目前两个队列中各有一条相同的消息),对应注解的方法会立即接收并消费队列中的消息。另外,在接受消息的方法中,参数类型可以与发送的消息类型保持一致,或者使用Object类型和Message类型。如果使用消息类型对应的参数接收消息的话,只能够得到具体的消息体信息;如果使用Object或者Message类型参数接收消息的话,还可以获得除了消息体外的参数信息MessageProperties。
案例中使用的是开发中常用的@RabbitLIsenter注解监听指定名称的消息情况,这种方式会在监听到指定队列存在消息后立即进行消费处理。除此之外,还可以使用RabbitTemplate模板类的receiveAndConvert(String queueName)方法手动消费指定队列中的消息。
基于配置类的方式
基于配置类的方式主要讲的是使用Spring Boot框架提供的@Configuration注解配置类定制消息发送组件,并进行消息发送。
RabbitMQConfig.java
package com.example.chapter08.config; import org.springframework.amqp.core.*; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { //自定义消息转化器 @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } //1.定义fanout类型的交换器 @Bean public Exchange fanout_exchange(){ return ExchangeBuilder.fanoutExchange("fanout_exchange").build(); } //2.定义两个不同名称的消息队列 @Bean public Queue fanout_queue_email(){ return new Queue("fanout_queue_email"); } @Bean public Queue fanout_queue_sms(){ return new Queue("fanout_queue_sms"); } //3.将两个不同名称的消息队列与交换器进行绑定 @Bean public Binding bindingEmail(){ return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs(); } @Bean public Binding bindingSms(){ return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs(); } }
使用@Bean注解定制了3种类型的Bean组件,这3种组件分别表示交换器、消息队列和消息队列与绑定器的绑定。这种基于配置类方式定制的消息组件内容和基于API方式定制的消息组件内容完全一样,只不过是实现方式不同而已。
按照消息服务整合实现步骤,完成消息组件的定制后,还需要编写消息发送者和消息消费者,而在基于API的方式中已经实现类消息发送者和消息消费者,并基于配置类方式定制的消息组件名称和之前测试用的消息发送和消息组件名称都是一样的,所以这里可以直接重复使用。
重新运行消息发送者测试方法psubPublisher(),消息消费者可以自动监听并消费消息队列种存在的消息,效果与基于API的方式测试效果一样。
基于注解的方式
基于注解的方式指的是使用Spring框架的@RabbitListener注解定制消息发送组件并发送消息。
打开进行消息接收和处理的业务类RabbitMQService,将针对邮件业务和短线业务处理的消息消费者方式进行注解,使用@RabbitListener注解机器相关属性定制消息发送组件。
RabbitMQService.java
package com.example.chapter08.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitMQService { /* * Publish/Subscribe工作模式接收,处理邮件业务 * */ //@RabbitListener(queues = "fanout_queue_email") @RabbitListener(bindings = @QueueBinding(value= @Queue("fanout_queue_email") ,exchange = @Exchange(value = "fanout_exchange" ,type = "fanout"))) public void psubConsumerEmail(Message message){ byte[] body=message.getBody(); String s=new String(body); System.out.println("邮件业务接收到消息:" + s); } /* * Publish/Subscribe工作模式接收,处理短信业务 * */ //@RabbitListener(queues = "fanout_queue_sms") @RabbitListener(bindings = @QueueBinding(value= @Queue("fanout_queue_sms") ,exchange = @Exchange(value = "fanout_exchange" ,type = "fanout"))) public void psubConsumerSms(Message message){ byte[] body=message.getBody(); String s=new String(body); System.out.println("短信业务接收到消息:" + s); } }
至此,在Spring Boot中完成了使用基于API、基于配置类和基于注解3种方式来实现Publish/Subscribe工作模式的整合讲解。在这3种实现消息服务的方式中,基于API的方式相对简单、直观,但容易与业务代码产生耦合;基于配置类的方式相对隔离、容易统一管理、符合Spring Boot框架思想;基于注解的方式清晰明了、方便各自管理,但是也容易与业务代码产生耦合。在实际开发中,使用基于配置类的方式和基于注解的方式定制组件实现消息服务较为常见,使用基于API的方式偶尔使用。
Routing(路由模式)
使用基于注解的方式定制消息组件和消费者
RabbitMQService
/* * 2.1路由模式消息接收、处理error级别日志信息 * */ @RabbitListener(bindings = @QueueBinding( value = @Queue("routing_queue_error"), exchange = @Exchange(value = "routing_exchange",type = "direct"), key = "error_routing_key" )) public void routingConsumerError(String message){ System.out.println("接收到error级别日志消息:" + message); } /* * 2.2路由模式消息接收、处理info、error、warning级别日志信息 * */ public void routingConsumerAll(String message){ System.out.println("接收到info、error、warning等级别日志消息:" + message); }
上述代码中,在消息业务处理类RabbitMQService中新增了两个用来处理Routing路由模式的消息消费者方法,在两个消费者方式上使用@RabbitListener注解及其相关属性定制了路由模式下的消息服务组件。从示例代码可以看出,与发布订阅模式下的注解相比,Routing路由模式下的交换器类型type属性为direct,而且还必须指定key属性(每个消息队列可以映射多个路由键,而在Spring Boot 1.X版本中,@QueueBinding中的key属性只接收Spring类型而不接收Spring[]类型)。
消息发送者发送消息
打开项目测试类Chapter08ApplicationTests,在该测试类中使用RabbitTemplate模板类实现Routing路由模式下的消息发送。
// 2.Routing工作模式消息发送端 @Test public void routingPublish(){ rabbitTemplate.convertAndSend("routing_exchange","error_routing_key","routing send error message"); }
在路由工作模式下发送消息时,必须指定路由键参数,该参数要与消息队列映射的路由键保持一致,否则发送的消息将会丢失。本次示例中使用的是error_routing_key路由键,根据定制规则,编写的两个消息消费者方式应该都可以正常接收并消费该发送端的消息。
Topics(通配符模式)
使用基于注解的方式定制消息组件和消息消费者
/* * 3.1通配符模式消息接收、进行邮件业务订阅处理 * */ @RabbitListener(bindings = @QueueBinding( value = @Queue("topic_queue_email"), exchange = @Exchange(value = "topic_exchange",type = "topic"), key = "info.#.email.#" )) public void topicConsumerEmail(String message){ System.out.println("接收到邮件订阅需求处理消息:" + message); } /* * 3.2通配符消息接收、进行短信业务订阅处理 * */ @RabbitListener(bindings = @QueueBinding( value = @Queue("topic_queue_sms"), exchange = @Exchange(value = "topic_exchange",type = "topic"), key = "info.#.sms.#" )) public void topicConsumerSms(String message){ System.out.println("接收到短信订阅需求处理消息:" + message); }
@Test public void topicPublisher(){ rabbitTemplate.convertAndSend("topic_exchange","info.email","topics send email message"); }
到此这篇关于Java Spring Boot消息服务万字详解分析的文章就介绍到这了,更多相关Java Spring Boot消息服务内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.csdn.net/shi_zi_183/article/details/120907864