RabbitMQ 发布订阅

时间:2022-06-20 16:43:32

互联网公司对消息队列是深度使用者,因此需要我们了解消息队列的方方面面,良好的设计及深入的理解,更有利于我们对消息队列的规划。

当前我们使用消息队列中发现一些问题

1、实际上是异步无返回远程调用,由发布者定义队列,消费者订阅已定义的队列。

2、并没有体现解耦设计,而且开发人员间依然要像单体项目开发那样针对同一个功能不断沟通交互,提高了开发时间以及成本。

3、没有消息版本的实现,导致发布者服务和消费者服务必须一起更新。如果没有保持一致可能导致批量的合法消息被丢到死信队列,甚至可能要启动旧服务将旧版本的消息消费掉才能更新服务。并且开发人员要进入发布流程指导各服务的发布顺序。

4、订阅者的对象定义在“集群”,但现实中的确存在“节点”的订阅需求,如节点的配置更新、本地缓存的刷新等。

快速、高效的使用消息队列,不需要过多的沟通成本,是我们不断的追求。因此发布订阅映入我们眼帘,公司内部称为分布式事件。

RabbitMQ的发布订阅能解决我们的什么问题呢?

  1. 不需要发送端和接收端同时发布。相比较普通的队列方式,如果发送端发布,而消费端未发布,那么就会有大量的消息积压。
  2. 实现一次发布,多次处理。

RabbitMQ发布订阅模式:像使用邮箱一样,不需要发送端和接收端共同发布。

RabbitMQ 发布订阅

发布订阅基础概念:

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息:

  1. fanout:所有bind到此exchange的queue都可以接收消息
  2. direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息
  3. topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
  4. headers:通过headers 来决定把消息发给哪些queue(这个很少用)

fanout订阅发布模式(广播模式)

RabbitMQ 发布订阅

direct订阅发布模式(广播模式)

RabbitMQ 发布订阅

topic定义发布模式(广播模式)

RabbitMQ 发布订阅

实现发布订阅模式下消息的发布订阅主要有以下几个步骤:

  1. 创建消息会话IMQSession(前提连接IMQConnection已经建立好了)
  2. 声明要订阅的消息主题
  3. 声明消息主题订阅者
  4. 声明消息发送者
  5. 发送消息
  6. 主题订阅者接收消息
  7. 关闭主题订阅者
  8. 关闭会话

生产者发送广播是实时的,消费者需要提前等待生产者发生消息,这个又叫订阅发布,收音机模式,就像只有收音机打开了才能听到锁定的FM频道,但是如果在节目开始一段时间,再打开收音机的话,之前的节目就收听不到了。即订阅之前的消息都是收不到的。

发布订阅相关的执行命令:

rabbitmqctl list_exchanges  列出所有exchange

临时队列:

我们需要每次连接至mq的时候使用一个新队列,使用完了就销毁,这里可使用临时队列:

result = channel.queue_declare()

然后就可以通过result.method.queue获取临时队列名称,提供给消费者使用。 另外消费者用完后需要销毁,可添加一个exclusive选项:result = channel.queue_declare(exclusive=True)  代表该队列是排他性队列。

绑定:Binding

RabbitMQ 发布订阅

channel.queue_bind(exchange='logs',
queue=result.method.queue)

查看系统所有的绑定命令

rabbitmqctl list_binding

发布订阅相关的概念主要包括exchange、binding、routingkey、及queue。我们只需要按照步骤,使用合适的交换器类型,即可实现发布订阅的一对多消息处理。