持久化
持久化可以提高RabbitMQ的可靠性,防止异常情况下的数据丢失。RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。
交换器的持久化通过声明队列时将durable参数置为true实现。如果交换器不设置持久化,在RabbitMQ服务重启之后,相关交换器的元数据会丢失,但消息不会丢失,只是不能再将消息发送到这个交换器。一个长期使用的交换器来建议将其置为持久化的。
队列的持久化通过在声明队列时将durable参数置为true实现。如果队列不设置持久化,在RabbitMQ服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。
消息的持久化在消息的投递模式(BasicProperties中的DeliveryMode属性)设置为2即可实现消息的持久化。单单设置消息持久化而不设置队列的持久化毫无意义,只有同时设置队列和消息的持久化才能保证RabbitMQ服务重启后,消息依旧存在。
之前的示例出现过该代码:
var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; string message = "RabbitMQ Test"; //传递的消息内容 channel.BasicPublish("normalExchange", "normalKey", properties, Encoding.UTF8.GetBytes(message));
写入磁盘的速度比写入内存的速度慢得多,将所有的消息都设置为持久化,会严重影响RabbitMQ的性能(随机)。在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡。
将交换器、队列、消息都设置持久化并不能百分之百保证数据不丢失。消费者订阅消费队列时将autoAck参数设置为true,并在接收消息之后没来得及处理就发生宕机,这也算数据丢失。另外一种情况就是RabbitMQ接收到消息,在持久化(保存到磁盘)之前,服务节点发生了宕机、重启等异常情况,也会造成消息丢失。
生产者确认
在使用RabbitMQ的时候,我们还会考虑的一个问题就是消息的生产者将消息发送出去之后,消息有没有正确地到达服务器?默认情况下发送消息的操作不会返回任何信息给生产者,也就是默认情况下生产者是不能确认消息有没有正确地到达服务器。
RabbitMQ针对这个问题,提供了两种解决方式:
通过事务机制实现(与数据库中的事务概念并不相同);
通过发送方确认(publisher confirm)机制实现。
事务机制
事务机制相关的方法有:channel.TxSelect、channel.TxCommit和channel.TxRollback。channel.TxSelect将当前的信道设置成事务模式,channel.TxCommit用于提交事务,channel.TxRollback用于事务回滚。通过channel.TxSelect方法开启事务之后,发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了RabbitMQ,如果在事务提交之前由于RabbitMQ异常崩溃或者其他原因抛出异常,通过执行channel.TxRollback方法来实现事务回滚。
示例代码:
using (var channel = connection.CreateModel()) { channel.TxSelect(); try { //发送消息 channel.TxCommit(); } catch (Exception) { channel.TxRollback(); } }
对一个通道而言TxSelect只需执行一次,TxCommit和TxRollback则需要多次执行,如循环发送多条消息时BasicPublish、TxCommit和TxRollback方法包裹进循环内即可,TxSelect在循环外调用。
协议流转过程(左为事务确认右为事务回滚):
事务确实能够解决消息发送方和RabbitMQ之间消息确认的问题,但是事务机制(不管是确认还是回滚多了Tx.Select和Tx.Commit或Tx.Rollback四个步骤)会严重影响RabbitMQ的性能。事务机制在一条消息发送之后会使发送端阻塞,等待RabbitMQ回应之后才能继续发送下一条消息。
发送方确认机制
发送方确认(publisher confirm)机制是RabbitMQ提供的一个相比于事务机制的改进方案。
生产者将信道设置成confirm(确认)模式【调用channel.ConfirmSelect方法(即Confirm.Select命令)将信道设置为confirm模式】,一旦信道进入confirm模式,在该信道上面发布的消息都会被指派一个唯一ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),告知生产者消息已经正确到达目的地。如果消息和队列是可持久化的,确认消息会在消息写入磁盘之后发出。RabbitMQ回传给生产者的确认消息中的DeliveryTag包含了确认消息的序号,RabbitMQ也可以设置channel.BasicAck方法中的multiple参数,表示这个序号之前的所有消息都已经确认。
confirm模式有普通confirm、批量confirm和异步confirm三种模式。
普通confirm
调用WaitForConfirmsOrDie等待消息返回。如果消息nack或者超时则该方法将抛出异常。异常的处理通常包括记录错误消息和/或重新尝试发送消息。
示例代码:
channel.QueueDeclare("confirm_queue", false, false, false, null); var message = "Confirm Message"; var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.BasicPublish("", "confirm_queue", properties, Encoding.UTF8.GetBytes(message)); // uses a 5 second timeout channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
消息的确认阻碍了所有后续消息的发布,显著减慢了发布速度,所以普通confirm是三种方式中性能最低的,但是是实现方式最简单的,对性能要求不高的程序很适合。
批量confirm
批量confirm表示我们发布一批消息,对整批消息执行等待确认。
批量confirm示例代码(一次确认10条消息):
for (int i = 0; i < 10; i++) { var msg = $"Confirm Message {i}"; channel.BasicPublish("", "confirm_queue", null, Encoding.UTF8.GetBytes(msg)); } channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
上面的代码只是最简单的示例,实际使用中可能需要增加消息缓存的代码,批量confirm异常时对这批消息进行重新发布,确认成功是情况消息缓存。
批量confirm出现返回Basic.Nack或者超时时,客户端需要将这一批次的消息全部重发会造成重复消息的情况,这一点需要注意。
异步confirm
异步confirm是为channel怎加两个事件BasicAcks和BasicNacks,分别用来处理RabbitMQ回传的Basic.Ack和Basic.Nack。两个事件的回调函数都有一个对应的 EventArgs 参数(BasicNackEventArgs类型) ,包含如下两个参数:
DeliveryTag:表示对应消息的序号
Multiple:表示是确认一条消息(false)还是确认当前序号前的所有消息(false)。
示例代码:
var outstandingConfirms = new ConcurrentDictionary<ulong, string>(); channel.BasicAcks += (sender, ea) => { if (ea.Multiple) { var confirmed = outstandingConfirms.Where(k => k.Key <= ea.DeliveryTag); foreach (var entry in confirmed) { outstandingConfirms.TryRemove(entry.Key, out _); } } else { outstandingConfirms.TryRemove(ea.DeliveryTag, out _); } }; channel.BasicNacks += (sender, ea) => { outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body); Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}"); //同理BasicAcks维护outstandingConfirms }; var msg = "Async Msg"; outstandingConfirms.TryAdd(channel.NextPublishSeqNo, msg); channel.BasicPublish("", "confirm_queue", null, Encoding.UTF8.GetBytes(msg));
上述代码定义了一个ConcurrentDictionary对象将消息和对应的消息序号关联起来,发布消息前通过channel.NextPublishSeqNo获取消息的序号,然后在回调确认时清理消息缓存字典。
消息分发
当RabbitMQ队列有多个消费者时,默认情况下,队列收到的消息将以轮询(round-robin)分发的方式发送给消费者,每条消息只发送给订阅列表里的一个消费者。如果有n个消费者,那么RabbitMQ会将第m条消息分发给第m%n(取余的方式)个消费者。如果个别消费者来不及消费那么多的消息,而其他消费者由于某些原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了所分配到的消息,进而进程空闲,这就造成整体应用吞吐量的下降,此时轮询分发机制就不是那么的优雅了。可以借助channel.BasicQos方法限制允许信道上的消费者所能保持的最大未确认消息的数量,未确认消息达到上限后RabbitMQ就不会再向这个消费者再发送消息,直至消费者确认了某条消息。
BasicQos方法参数介绍:
prefetchSize,预取大小服务器将传递的最大内容量(以八位字节为单位),如果不受限制,则为0。默认值:0。
prefetchCount,服务器一次请求将传递的最大邮件数,如果没有限制,则为0。调用此方法时,该值必填。默认值:0
global,是否将设置应用于整个频道,而不是每个消费者
默认值:false,应用于本身(一个消费者)
true:应用于整个频道
关于BasicQos的示例RabbitMQ官方文档Fair Dispatch部分有完成的示例代码:https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
消息顺序性
消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。如果生产者发布的消息分别为msg1、msg2、msg3,那么消费者也是按照msg1、msg2、msg3的顺序进行消费的。
但是实际使用时会有很多情况打破消息顺序性,如生成者启用事务机制,某种原因进行了事务回滚由一个新线程补发消息以及消息设置了优先级等。消息的顺序必然不会和生产者发送消息的顺序一致。
如果要保证消息的顺序性,根据具体业务处理,比如在消息体内添加全局有序标识来实现。
Github
https://github.com/MayueCif/RabbitMQ