RabbitMQ介绍2 - AMQP协议

时间:2020-12-01 16:17:38

这一节介绍RabbitMQ的一些概念,当然也是AMQP协议的概念。官方网站也有详细解释,包括协议的命令: http://www.rabbitmq.com/tutorials/amqp-concepts.html

消息通信拓扑结构概念:生产者producer,消费者consumer,队列queue,交换器exchange,路由键routing key,绑定键binding key。

消息传播过程:producer发布消息给交换器,交换器根据路由规则,将消息放到绑定的队列,消费者从队列中得到消息。

RabbitMQ介绍2 - AMQP协议

Queue。队列是唯一存储消息的地方,一条消息路由后要么进入队列,要么被丢弃。

RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。

RabbitMQ介绍2 - AMQP协议

多个消费者可以订阅同一个Queue,这时Queue中的消息会被按照round-robin分摊给多个消费者,而不是每个消费者都收到所有的消息。

RabbitMQ介绍2 - AMQP协议

Prefetch count。消费者从队列中订阅消息的时候,RabbitMQ会把队列中的全部消息推送到消费者,然后等待确认。通过设置prefetch count可以限定消息确认前最多发送给消费者的消息数量。在有多个消费者订阅一个队列的时候,设置这个参数可以帮助实现负载平衡。注意这里提到的订阅是持续订阅,通过basic.get进行的单次订阅不在此列。

创建队列。生产者和消费者都可以使用queue.declare创建队列,如果消费者在同一条信道上订阅了另一个队列的话,就无法再创建队列了。创建队列时如果不指定队列的名称,RabbitMQ会分配一个随机名称,并在queue.declare的结果中返回。创建队列如果已经存在,只要声明参数和现存队列完全一样,RabbitMQ什么都不做,创建过程成功返回,如果参数不匹配,会抛出异常。如果想检查队列是否存在,创建时设置参数passive为true,如果队列存在,创建成功返回,否则返回错误。

队列属性:exclusive,如果为true的话,只有你的程序才能消费队列。Auto-delete,如果为true的话,在最后一个消费者取消订阅的时候,队列会自动删除。

Exchange。生成者将消息发送到exchange,exchange根据消息的routing key,路由消息到相应的队列。不同的交换器类型exchange type提供不同的路由方案,exchange type有4种:direct,fanout,topic和headers。

1. direct。消息的routing key需要精确匹配binding key。

RabbitMQ介绍2 - AMQP协议

当生产者(P)发送的消息Rotuing key=booking时,发现Queue1和Queue2都符合,就会将消息传送给这两个队列,如果以Rotuing key=create或Rotuing key=confirm发送消息时,这时消息只会被推送到Queue2队列中,其他Routing Key的消息将会被丢弃。

注意RabbitMQ有一个默认Exchange,类型是direct,没有办法绑定一个queue到这个exchange,但是通过指定routing key,可以发送消息到同名的queue。

RabbitMQ介绍2 - AMQP协议

2. fanout。这个类型的exchange会发送消息到所有绑定在上面的queue,routing key和binding key不起作用。

RabbitMQ介绍2 - AMQP协议

生产者(P)生产消息1将消息1推送到Exchange,Exchange将消息推送到所有与它绑定Queue,最后两个消费者都会收到消息。

3. topic。Binding key可以使用*和#来对routing key进行模糊匹配。

  • 点号“. ”分隔的每一段字符串称为一个单词,如“quick.orange.rabbit”包含3个单词quick,orange,rabbit。点分割的部分可以为空,比如bindingkey=*.*的可以匹配routingkey是"."的消息,但是bindingkey=“*”无法匹配routingkey是空的消息。可以简单理解,先将routingkey按照stringsplit方法用"."分割,保留空白字符,*表示匹配分割后的一个位置,#表示匹配0…n个位置。其余字符需要精确匹配。
  • *表示一个单词,#表示0或多个单词。
  • binding key使用“*”与“#”来模糊匹配routing key。routing key中的“*”或“#”当做普通字符处理。

    RabbitMQ介绍2 - AMQP协议

    当生产者发送消息Routing Key=F.C.E时,只会被路由到Queue1中,如果Routing Key=A.C.E这时候会被同时路由到Queue1和Queue2中,如果Routing Key=A.F.B时,这里只会发送一条消息到Queue2中

    Will "*" binding catch a message sent with an empty routing key? 不会

    Will "#.*" catch a message with a string ".." as a key? Will it catch a message with a single word key? 都会

    How different is "a.*.#" from "a.#"? 前者routingkey要有"a.",后者只要有"a"就能匹配。

4. headers。headers交换器允许你匹配AMQP消息的header而非路由键,除此之外,和direct交换器完全一样,但是性能会差很多,因此并不太实用,而且几乎再也用不到了。使用方法:在绑定的时候提供一组键值对,如果消息的header(也是一组键值对)和其完全匹配,则路由消息到队列。

网络连接概念:ConnectionFactory, connection, channel信道。connectionFactory用来建立连接,connection表示一个TCP连接,channel是一个建立在connection上的虚拟连接。每条信道通过一个唯一的ID标记,发布消息、绑定、消费消息这些操作都是在信道上完成。connection和channel的关系类似于进程和线程。

问题:在同一个channel能订阅多个queue吗?生产者和消费者能使用同一个channel吗?同一个channel能给多个exchange发消息吗?

消息包括有效荷载payload和标签label。payload是需要传输的数据,可以是任何东西(C#编程的时候就是一个byte数组),label是传输数据的一些描述,比如routing key,持久化delivery mode。

消息持久化。exchange,queue,message都可以设置是否持久化durable,如果是持久化的,服务重启后会重建exchange、queue,消息会放到原来的queue里,重新发送给消费者。要注意的是,只有这3个都是持久化的,消息才会在RabbitMQ重启后还保留,任何一个环节不是持久化的,消息都不会恢复,也就是丢失了。持久化会可能会使吞吐量降低10倍(采用SSD存储可以大大缓解),而且在集群环境持久化队列不会在其它节点上重建。如果集群中一个节点崩溃,其上的持久化队列便从集群中消失了,在这个节点重启前,所有发送到这些队列的消息都会丢失。作为对比,非持久化队列在节点崩溃后,会在其它节点重建,消息便能发送到重建的队列上。所有,如果不是必须保留消息,不要用持久化。

消息确认。消费者接收到的每一条消息都必须进行确认。消费者必须通过AMQP的basic.ack命令显式地向RabbitMQ发送一个确认,或者在订阅到队列的时候设置auto_ack为true。auto_ack为true时,一旦消费者接收消息,RabbitMQ会自动认为其进行了确认。只有确认过的消息,才会从队列中删除。如果消费者接收了消息,但没有进行确认,消息会进入等待确认状态,这时候如果消费者的连接断开,RabbitMQ会清除这些消息的状态,并发送给队列的其它消费者。如果消费者一直不进行确认,消息会堆积在队列中,不会超时。消费者也可以明确拒绝一个消息(basic.reject命令),拒绝的时候如果requeue参数为true,消息会重新发送给下一个订阅的消费者,requeue为false时会从队列中删除消息,这时候的效果和发送确认类似,不同的是2.0以后版本的RabbitMQ会维护被拒绝的消息队列,以便后续调查。

AMQP事务transaction。由于发布消息不返回任何信息给生产者,你怎么知道服务器是否已经保存了持久化消息到硬盘呢?事务用来确认发送者的消息已经进入了队列(或者被丢弃)。在把信道设置为事务模式后,这条信道上的消息将按照串行方式发送,只有前面的消息确认成功了,才会执行后面的,直到提交事务,结束信道的事务模式。事务会极大的降低RabbitMQ的吞吐量(2~10倍)。

发送方确认模式。这是RabbitMQ的概念,不属于AMQP。类似AMQP事务的功能,但是不需要串行,极大的提高了吞吐量。使用方法:将信道设置为confirm模式,只有重新创建信道才能关闭该设置。所有信道上发送的消息都会指派一个唯一的ID(从1开始),一旦消息投递成功,信道会发送一个确认给生产者。生产者通过回调接收信道的确认消息(如果消息丢失,会收到nack),与此同时,生产者也可以继续发送消息。由于没有事务的消息回滚,发送方确认模式更加轻量,对RabbitMQ服务器的影响也可以忽略不计。

我们没有办法得到信道指派的这个ID(basic_publish不会返回ID信息),那怎么确认信道确认消息中的ID具体指的是哪个消息呢?方法是发送者自己维护消息和ID的关系。一个信道一般只有一个线程使用,ID从1开始,每次递增1,发送者完全可以自己算出消息的ID。