前言
概述
大多数应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦的能力。
消息服务中两个重要概念:消息代理(message broker)和目的地(destination)。当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地。
消息队列主要有两种形式的目的地:
- 队列(queue):点对点消息通信(point-to-point)。
- 主题(topic):发布(publish)/订阅(subscribe)消息通信。
点对点:
- 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列。
- 消息只有唯一的发送者和接收者,但并不是只能有一个接收者。
发布/订阅:
- 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时接收到消息。
两种规范
JMS(Java Message Service):
- Java 消息服务,基于 JVM 消息代理的规范。ActiveMQ、HornetMQ 是 JMS 的实现。
AMQP(Advanced Message Queuing Protocol):
- 高级消息队列协议,也是一个消息代理的规范,兼容 JMS。
- RabbitMQ 是 AMQP 的实现。
JMS | AMQP | |
---|---|---|
定义 | Java API | 网络级协议 |
跨语言 | 否 | 是 |
跨平台 | 否 | 是 |
Model | 提供 2 种消息模型:
|
提供了 5 种消息模型:
本质来讲,后四种和 JMS 的 Pub/Sub 模型没有太大区别,仅是在路由机制上做了更详细的区分。 |
支持消息类型 | 多种消息类型:
|
因其要支持跨语言跨平台,所以仅支持 byte[],当实际应用中有复杂的消息时,可以将消息序列化后发送。 |
综合 | HMS 定义了 Java API 层面的标准,在 Java 体系中,多个 client 均可通过 JMS 进行交互,不需要修改应用代码,但是其对跨平台支持较差。 | AMQP 定义了 wire-level 层的协议标准,天然具有跨平台、跨语言特性。 |
Spring 支持:
- spring-jms 提供了对 JMS 的支持。
- spring-rabbit 提供了对 AMQP 的支持。
- 需使用 ConnectionFactory 的实现来连接消息代理。
- 提供 JmsTemplate、RabbitTemplate 来操作消息。
- @JmsListener(JMS)和 @RabbitListener(AMQP)注解标注在方法上可监听消息代理发布的消息。
- @EnableJms、@EnableRabbit 开启支持。
SpringBoot 自动配置类:
- JMS 的自动配置类为 JmsAutoConfiguration。
- AMQP 的自动配置类为 RabbitAutoConfiguration。
几种场景
异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种:串行方式、并行方式。
1、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
2、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。
假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是 150 毫秒,并行的时间可能是 100 毫秒。
因为 CPU 在单位时间内处理的请求数是一定的,假设 CPU1 秒内吞吐量是 100 次。则串行方式 1 秒内 CPU 可处理的请求量是 7 次(1000/150)。并行方式处理的请求量是 10 次(1000/100)。
如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)很容易达到瓶颈。
3、引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒 20 QPS。比串行提高了 3 倍,比并行提高了 2 倍。
应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。
传统模式:
传统模式的缺点:
- 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败。
- 订单系统与库存系统耦合。
引入消息队列:
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。
流量削锋
场景说明:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
可以控制活动的人数。
可以缓解短时间内高流量压垮应用。
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
秒杀业务根据消息队列中的请求信息,再做后续处理。
该部分内容摘自 https://blog.csdn.net/cws1214/article/details/52922267。
RabbitMQ介绍
简介
RabbitMQ 采用 Erlang 语言开发,是 AMQP 的开源实现。Erlang 语言由 Ericson 设计,专门为开发 concurrent 和 distribution 系统的一种语言,在电信领域使用广泛。OTP(Open Telecom Platform)作为 Erlang 语言的一部分,包含了很多基于 Erlang 开发的中间件/库/工具,如 mnesia/SASL,极大方便了 Erlang 应用的开发。OTP 就类似于 Python 语言中众多的 module,用户借助这些 module 可以很方便的开发应用。
核心概念
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先级)、delivery-mode(标识指定消息是否需要持久性存储)等。
Publisher
消息的生产者,也是一个向交换机发布消息的客户端应用程序。
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由到服务器中的队列,也是消息到达 Broker 的第一站,根据分发规则,匹配查询表中的路由键,分发消息到队列中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) 和 fanout (multicast)。
Queue
消息队列,用来保存信息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可以投入到一个或多个队列。消息一致在队列中,等待消费者连接到这个队列将其取走。
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。路由器和队列的绑定可以是多对多的关系。
Connection
连接,Publisher/Consumer 和 Broker 之间的 TCP 连接。断开连接的操作只会在 client 端进行,Broker 不会断开连接,除非出现网络故障或 Broker 服务出现问题。
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接受消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念以复用一条 TCP 连接。
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 '/'。
Broker
表示消息队列服务器实体,即接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。
运行机制
AMQP中的消息路由
AMQP 中的消息路由与 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者需要把消息发布到 Exchange,最终由 Exchange 转发到队列并被消费者接收,而 Binding 就决定了交换器会将消息转发到哪个队列。
Exchange类型
Exchange 分发消息时根据类型的不同分发策略有区别,目前共有四种类型:direct、fanout、topic、headers。headers 匹配 AMQP 消息的 header 而不是路由键,headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三中类型:
-
Direct Exchange
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致,交换器就将消息发送到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为"dog",则只转发 routing key 标记为 "dog" 的消息,不会转发 "dog.puppy",也不会转发"dog.guard"等。它是完全匹配、单播的模式。
- Fanout Exchange:每个发到 fanout 类型交换器的消息都会分发到所有绑定的队列中。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息时最快的。
- Topic Exchange:topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将由路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:"#"和"*"。"#" 匹配 0 个或多个单词,"*" 匹配一个单词。简单说就是根据 routing key 及通配规则将消息分发到目标队列中。
安装
参考【Docker 安装RabbitMQ】。
使用
准备
1、进入 RabbitMQ 的 web 可视化页,用 guest 用户登录,密码也为 guest。
2、新建如下测试队列:
3、新建如下测试交换器:
4、给新建的 direct 和 fanout 交换器新建如下绑定:
5、给新建的 topic 交换器新建如下绑定:
direct交换机测试
1、给“张三.msg”这个队列发送消息:
2、“张三.msg”接收消息:
fanout交换机测试
1、给所有绑定的队列发送消息:
2、所有队列都接收到消息:
topic交换器测试
1、给所有“姓张”的队列发送消息:
2、所有“姓张”的队列都接收到消息:
整合RabbitMQ
准备
1、使用 maven 新建 SpringBoot 项目,引入 Rabbit 、Web 场景启动器。
2、配置 RabbitMQ 连接信息:
spring.rabbitmq.host=192.168.202.136 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
application.properties
3、注解配置启用 RabbitMQ:
package com.springboot.config; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.context.annotation.Configuration; @Configuration @EnableRabbit // 启用 Rabbit public class MyAmqpConfig { }
com.springboot.config.MyAmqpConfig
4、新建测试 JavaBean:
package com.springboot.bean; import java.io.Serializable; import java.util.Date; public class User implements Serializable { private Integer id; private String name; private Date birthday; private String city; public User() { } public User(Integer id, String name, Date birthday, String city) { this.id = id; this.name = name; this.birthday = birthday; this.city = city; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getBirthday() { return birthday; } public void setBirthday(Date birthday) { this.birthday = birthday; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } @Override public String toString() { return "User{" + "id=" + id + ", name='" + name + '\'' + ", birthday=" + birthday + ", city='" + city + '\'' + '}'; } }
com.springboot.bean.User
RabbitTemplate使用
下面通过 RabbitTemplate 来完成上述 RabbitMQ 在可视化界面中的几个测试操作:
package com.springboot; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitTemplateTests { // org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration 自动配置类中注册了 RabbitTemplate 的 bean @Autowired private RabbitTemplate rabbitTemplate; @Test public void test1(){ // 通过 direct 交换器给 “张三.msg” 队列发送消息 // send 方法的 message 参数中需要自己定义消息头和消息体 // rabbitTemplate.send(exchange,routingkey,message); rabbitTemplate.convertAndSend("my.direct","zhangsan.msg","你好 张三"); } @Test public void test2(){ // 接收 “张三.msg” 队列的消息 Object o = rabbitTemplate.receiveAndConvert("张三.msg"); System.out.println(o.toString()); /* 你好 张三 */ } @Test public void test3(){ // 通过 fanout 交换器给所有队列发送消息 rabbitTemplate.convertAndSend("my.fanout", "zhangsan.msg", "大家好"); } @Test public void test4(){ // 所有队列接收消息 Object msg1 = rabbitTemplate.receiveAndConvert("张三.msg"); System.out.println(msg1.toString()); Object msg2 = rabbitTemplate.receiveAndConvert("张四.msg"); System.out.println(msg2.toString()); Object msg3 = rabbitTemplate.receiveAndConvert("李三.msg"); System.out.println(msg3.toString()); Object msg4 = rabbitTemplate.receiveAndConvert("李四.msg"); System.out.println(msg4.toString()); /* 大家好 大家好 大家好 大家好 */ } @Test public void test5(){ // 通过 topic 交换器给所有“姓张”的队列发送消息 rabbitTemplate.convertAndSend("my.topic", "zhang.hello", "张先生 你好"); } @Test public void test6(){ // 所有“姓张”的队列接收消息 Object msg1 = rabbitTemplate.receiveAndConvert("张三.msg"); Object msg2 = rabbitTemplate.receiveAndConvert("张四.msg"); System.out.println(msg1); System.out.println(msg2); /* 张先生 你好 张先生 你好 */ } }
test
在上述的操作中操作的都是字符串,而通过 RabbitTemplate 是可以直接操作对象的,RabbitTemplate 内部的 Converter 会自动帮我们完成对象的序列化与反序列化:
package com.springboot; import com.springboot.bean.User; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.text.ParseException; import java.text.SimpleDateFormat; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitTemplateTests { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test1() throws ParseException { // 直接发送一个对象 User user = new User(1, "张三", new SimpleDateFormat("yyyy-MM-dd").parse("1998-6-5"), "深圳"); rabbitTemplate.convertAndSend("my.direct","zhangsan.msg",user); } @Test public void test2(){ Object o = rabbitTemplate.receiveAndConvert("张三.msg"); System.out.println(o.getClass()); System.out.println(o); /* class com.springboot.bean.User User{id=1, name='张三', birthday=Fri Jun 05 00:00:00 CST 1998, city='深圳'} */ // 根据输出结果可以看到,获取的消息自动完成了反序列化转换为 java 对象 } }
test
查看 RabbitMQ 服务器中存储的对象,会发现存储的值为 RabbitMQ 以默认消息转换器 org.springframework.amqp.support.converter.SimpleMessageConverter 序列化后的值,如果我们需要存储的消息为 Json 格式,只需要自己注册一个 Json 格式消息转换器到容器即可,而 Spring 已经给我们提供了这个转换器:
package com.springboot.config; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableRabbit public class MyAmqpConfig { @Bean public MessageConverter messageConverter(){ Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); return jackson2JsonMessageConverter; } }
com.springboot.config.MyAmqpConfig
此时再次执行上述操作,查看服务器中存储消息:
消息以转换为 Json 格式。
监听队列-@RabbitListener
Spring 也为我们提供了监听队列支持的注解 @RabbitListener,它能够帮我们很简便的创建一个监听服务,只需要标注在一个存放在 IoC 容器中实例的方法上。看如下示例:
1、创建一个服务类,注册到 IoC 容器,使用 @RabbitListener 注解标注在方法上:
package com.springboot.service; import com.springboot.bean.User; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class UserService { @RabbitListener(queues = {"张三.msg"}) // 监听指定队列消息 public void receiveUserMsg(User user) { // 接收自动反序列化后的对象 System.out.println(user); } @RabbitListener(queues = {"李四.msg"}) public void receiveMessage(Message message){ // 接收源消息信息 // 获得消息体 System.out.println(message.getBody()); // 获得消息属性信息 System.out.println(message.getMessageProperties()); } }
com.springboot.service.UserService
2、启动程序,运行单元测试中发送 User 对象方法,监听程序输出如下:
User{id=1, name='张三', birthday=Fri Jun 05 00:00:00 CST 1998, city='深圳'}
AmqpAdmin组件
Spring 自动注册了一个 AmqpAdmin 组件,它的作用类似于数据库中的 DDL 语句,可以用来帮我们定义(创建)交换器、队列。如下:
package com.springboot; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class AmqpAdminTests { @Autowired private AmqpAdmin amqpAdmin; @Test public void testDeclareExchange(){ // 创建一个交换器 Exchange exchange = new DirectExchange("my.directNew"); amqpAdmin.declareExchange(exchange);
} @Test public void testDeclareQueue(){ // 创建 Queue Queue queue = new Queue("myQueue"); amqpAdmin.declareQueue(queue);
} @Test public void testBinding(){ // 创建一个 binding ,绑定交换器与队列 amqpAdmin.declareBinding(new Binding("myQueue", Binding.DestinationType.QUEUE,"my.directNew","myQueue",null));
} }
test