生产者发送消息的过程
(1)生产者连接到RabbitMQ Broker建立一个连接(Connection),开启一个信道(Channel)
(2)生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等
(3)生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
(4)生产者通过路由键将交换器和队列绑定起来
(5)生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
(6)相应的交换器根据接收到的路由键查找相匹配的队列
(7)如果找到,则将从生产者发送过来的消息存入相应的队列中。
(8)如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
(9)关闭信道。
(10)关闭连接。
消费者接受消息的过程
(1)消费者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
(2)消费者向RabbitMQ Broker请求消费响应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作。
(3)等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接受消息。
(4)消费者确认(ack)接收到的消息。
(5)RabbitMQ从队列中删除相应已经被确认的消息。
(6)关闭信道。
(7)关闭连接。
什么是信道?
信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。
为什么引入信道?
建立和销毁TCP连接是非常昂贵的开销,RabbitMQ采用类似NIO(Non-blockingI/O)的做法,选择TCP连接复用,不仅可以减少性能开销,同时也便于管理。
exchangeDeclare方法详解
exchange:交换器的名称
type:交换器的类型,常见的如fanout、direct、topic。
fanout不处理routingKey;
direct处理routingKey,完全匹配;
topic将routingKey和某模式匹配;
durable:设置是否持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
autoDelete:设置是否自动删除。自动删除的前提是至少有一个队里或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。
注意不能错误地把这个参数理解为:“当与此交换器链连接的客户端都断开时,RabbitMQ会自动删除本交换器”
internal:设置是否是内置的。内置的交换器,客户端程序无法直接发送消息到这个交换器,只能通过交换器路由到交换器这种方式。
argument:其他一些结构化参数,比如alternate-exchange
queueDeclare方法详解
queue:队列的名称
durable:设置是否持久化。
exclusive:设置是否排他。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问同一创建的排他队列。“首次”是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会自动删除,这种队列适用于一个客户端同时发送或读取消息的应用场景
autoDelete:设置是否自动删除。为true则设置队列为自动删除。自动删除的前提是:
至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为:"当连接到此队列的所有客户端断开时,这个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
arguments:设置队列的其他一些参数,如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key,x-max-priority等。
queueBind方法详解
queue:队列名称:
exchange:交换器的名称:
routingKey:用来绑定队列和交换器的路由键;
argument:定义绑定的一些参数。
channel.basicQos方法
允许限制信道上的消费者所能保持的最大未确认消息的数量
basicPublish发送消息
exchange:交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到RabbitMQ默认的交换器中。
routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中。
props:消息的基本属性集,其包含14个属性成员,分别有contentType、contentEncoding、headers(Map<StringObject>)、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。其中常用的几种都在上面的示例中进行了演示。
byte[] body:消息体(payload),真正需要发送的消息
mandatory和immediate的详细内容请参考4.1
消费消息
RabbitMQ的消息模式分两种:推(Push)模式和拉(Pull)模式。推模式采用Basic.Consume进行消费,而拉模式则调用Basic.Get进行消费
在推模式中,可以通过持续订阅的方式来消费消息,使用到的相关类有:Consumer、DefaultConsumer
basicConsume方法
queue:队列的名称
autoAck:设置是否自动确认。
consumerTag:消费者标签,用来区分多个消费者。
noLocal:设置为true表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者。
exclusive:设置是否排他
arguments:设置消费者的其他参数
callback:设置消费者的回调函数。用来处理RabbitMQ推送过来的消息,比如DefaultConsumer,使用时需要客户端重写其中的方法。
拉模式
channel.basicGet
消费端的确认与拒绝
autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费了这些消息。而为false,消费者就有足够的时间处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待持有消息直到消费者显式调用Basic.Ack命令为止
basicAck(envelope.getDeliveryTag(),false);告诉消息服务器来删除消息
basicReject消息拒绝
deliveryTag:可以看作消息的编号,64位长整型
requeue:true,则RabbitMQ会重新存入队列,false,则RabbitMQ会立即把消息从队列中移除
mandatory和immediate是channel.basicPublish方法中的两个参数,他们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。
mandatory:true,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。false,出现上述情形,则消息直接被丢弃。
immediate(3.0版本已去掉):true,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回至生产者
延迟队列
DLX和相应的死信队列:当相应的消息过期时,就会转存到相应的死信队列(即延迟队列)中,这样消费者根据业务自身的情况,分别选择不同延迟登记的延迟队列进行消费
vhost
每一个RabbitMQ服务器都能创建虚拟的消息服务器,我们称之为vhost虚拟主机,每个vhost本质上是一个独立的小型RabbitMQ服务器,拥有自己独立的队列、交换器及绑定关系等,并且它拥有自己独立的权限。
恢复机制
MQ挂了,客户端会隔一段时间重连
队列删除了,客户端会重新建一个