RabbitMQ之异步通信(黑马快速入门)

时间:2024-10-02 07:15:02

前面我们了用fengin客户端发送http请求

异步调用常见实现就是事件驱动模式

异步调用比同步调用的好处?

优势一:业务的解耦,以前我们直接去调另一个微服务,现在我们是通知给一个broker,第三者

之后我们支付服务还要增加新的需求,只需要订阅事件即可完成

优势二:响应快,我们微服务通知到Broker就完成任务,不关心其他的

优势三:不会说一个微服务提供者倒了,我们消费者就会卡在这个地方

优势四:流量削峰:

那么异步通信的缺点:

太过于依赖broker的性能

架构复杂,业务没有明显的流程线,不好追踪管理

MQ:MessageQueue即消息队列,即broker

可以使用docker ps -a命令列出所有docker容器,包括未运行的docker容器。

docker rm -f +id可以删除对应的容器

不同的虚拟主机之间逻辑分组,进行隔离

RabbitMQ中的几个概念:

•channel:操作MQ的工具

•exchange:路由消息到队列中

•queue:缓存消息

•virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

基本队列模型:

一下这个demo是基于RabitMQ官方api,流程麻烦

建立连接什么的还需要硬编码到程序中 需要手动声明队列 创建channel操作消费者和发布者

控制台打印如下:说明什么?异步的机制,我在消费者中只是绑定了队列和消费者的行为(一个类似回调函数的东西),然后程序继续往下执行,这正是异步的机制

等待接收消息。。。。

接收到消息:【hello, rabbitmq!】

接收到消息:【hello, rabbitmq!】

重点:

官网:/projects/spring-amqp

通过这个springAMQP可以 解决硬编码 连接MQ服务写在配置文件中

不用再手动创建队列

你只需要再配置文件中配置如下:

spring:

rabbitmq:

host: 192.168.149.100 # rabbitMQ的ip地址

port: 5672 # 端口

username: itcast

password: 123321

virtual-host: /

spring帮助你进行连接创建channel创建等等。。。

通过一个模板类的对象发送消息 不过这个队列的话需要你手动创建

你只需要通过MQ模板类对象发送消息即可 真是太优雅了

服务器端查看队列中的消息

我们需要知道监听哪个队列 @RabbitListener(queues="")

以及监听到队列中有新消息后执行什么操作 编写方法

这些操作被封装到类中的方法

2.WORK_QUEUE

挂两个消费者,提高处理效率

问题是consumer1和2的一个分配策略?能者多得?

如果发布者每秒50条消息发送 消费者1每秒60,2每秒10那么1会自己1干吗?

策略是怎么样的?怎么设置?

rabbitmq消息预取机制:

预先消息无上限

队列中有大量的消息,channel会交替分配给1和2,先分配,先揽活是吧?干的快慢我不管的意思

怎么解决?

预取的消息设置为1就可以,但是一条消息要么给消费者1要么给消费者2,只能给一个消费者处理

listener:

simple:

prefetch: 1

  1. 上述的案例我们无法解决我们的问题,引入了发布订阅模式,我们需要发送一个消息,这个消息不能只只被一个消费者消费,由此我们引入交换机

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)

常见的交换机类型有一下三种:

第一种fanout:会把消息转发路由给每一个与其绑定的队列,这正符合我们的要求

所以我们的重点在于怎么将交换机和queue进行绑定

springamqp提供了一些列api让我们操作

我们先看接口及其继承关系

步骤:声明一个队列 声明一个交换机 将队列绑定到交换机 所用的api如上所示

不同的是 我们发送消息给交换机 不再是队列了

现在有个问题 我不能通过springamqp进行交换机和队列的创建?怎么回事?





路由交换机:

exchange和queue之间规定一个暗号bindingkey

生产者发送消息的时候指定一个routingkey

当routingkey==bindingkey,消息经过路由交换机转发到对应的队列上

所以有这种情况:当两个队列同时绑定到交换机的bindingkey暗号一样时候

我们发送消息时的routingkey要是和这两个暗号相等,就可以把消息转发到这两个队列中

所以,路由交换机更加灵活,可以模拟fanout交换机

之前的声明队列声明交换机以及绑定关系在一个configrutation中(一个spring上下文容器)

声明了很多个bean

很麻烦啊

现在可以通过

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name= "direct.queue1"),

exchange = @Exchange(name = "", type = ExchangeTypes.DIRECT),

key = {"red", "blue"}

))

在消费者监听的地方,直接写消费者要绑定的队列名字

交换机名字,以及交换机的类型

还有最重要的bindingkey的值

发送的时候别忘了指定routingkey,当然还有交换机和消息

总结:Fanout与Direct的区别

————————————————————————————————————————————————————————————————————————————————————

最后一个交换机TopicExchange

topicexchange相较于directchange,它增加了通配符,降低交换机和队列中bindingkey的书写难度,

遇到一个问题:就是在

通过@RabbitListener中明明声明了队列交换机绑定了关系以及bindingkey

但是去rabbit去看发现没有相应的队列交换机

我的解决是先clean再重新build了该工程 再重新启动即可

发送消息

Topic交换机接收的消息RoutingKey必须是多个单词,以 .分割

接受消息:

•Topic交换机与队列绑定时的bindingKey可以指定通配符

•#:代表0个或多个词

•*:代表1个词

总结:

最后一个:

ctrl+p看一个方法可以接受哪些类型参数

我们可以看出,发送消息的方法中消息可以是object类型!

实际上说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送,使用的jdk自带的序列化器(看着像乱码)

所以我们指定转换器,序列化指定类型 例如json

消息发送的时候指定序列化器:把对象序列化成json

那么在消息接受时,要和发送时使用一样的序列化器

结果如下: