工作队列-WorkQueue
实现功能:
将耗时的任务分发给多个工作者
设计思想:
避免直接去做一件资源密集型的任务,并且还得等它完成。因此将任务安排后再去做。将任务封装为一个消息,发到队列中。一个工作进程将在后台取出任务并最终完成。如果开启多个工作进程,任务将在这个多工作进程间共享
消息分发:
一、循环分发(Message acknowledement):
默认情况下,rabbitmq是轮流发消息给下一个消费者,平均每个cusumer接到的消息数量相等。
那么问题来了,当一个消息十分耗时,那么获得到这个消息的cusumer,在执行时崩溃导致任务未完成,那么这个消息就丢失了。为了保证出现类似问题,消息不消失。rabbitmq有一个消息确认机制。
二、消息确认机制(ack):
rabbitmq支持消息确认,为保证消息永不丢失,cusumer会发送一个确认消息告诉rabbitmq。代表我已接收消息,并处理完成。可以随时删除。
当一个cusumer在发送确认消息前死亡(连接或通道关闭,tcp连接丢失等),rabbitmq会认为该消息没有被完全处理并将其重新加入队列。如果此时有其他cusumer,rabbitmq很快会重新发送该消息到其他cusumer。通过这个方式保证没有消息丢失,及时某个cusumer意外死亡。
开启消息确认机制,rabbitmq默认打开。代码:autoAck = true(关掉)/false(打开)
注:对于rabbitmq而言,没有消息超时
当我们开启消息确认机制之后,可以保证cusumer死亡时不会丢失消息。但当rabbitmq服务关闭或崩溃后,会丢失所有的队列和消息。
那如何解决因服务关闭或崩溃造成的消息丢失呢?我们需要做三件事情:
持久化exchange(交换机):声明时指定durable = true
持久化queue(队列):声明时指定durable = true
持久化message(消息):在消息投递时指定delivery_mode = 2(1是非持久化)或将MessageProperties的值设置为PERSISTENT_TEXT_PALIN
注:rabbitmq不允许重新定义已经存在的队列的持久化,如上一章中我们设置的MyQueue队列。如果设置了该队列不持久化,那么我们不能再声明它持久化。不然会报错。我们必须重新声明一个新队列并声明持久化。
以上我们可以做一个小结:
1、rabbitmq在服务端没有声明队列和消息持久化时,队列和消息存在内存中,服务端宕机后都丢失。
2、服务端声明持久化,客户端想要接收消息,必须声明queue同时声明持久化,不然客户端执行报错。
三、公平分发(Fair dispatch):
在循环分发机制中,可能会发生一个cusumer接收的消息处理非常耗时,而另一个cusumer接收的消息非常快处理完。这会导致有的cusumer很忙有的很闲。
rabbitmq对此一概不知。因为它只是当消息进入队列就分发出去,并没有查看每个cusumer未返回消息确认的数量。
为了改变这种情况,rabbitmq提供了公平分发机制。使用basicQos()方法。将其参数prefetchCount设置为1。这样cusumer会告诉rabbitmq,不要同时发多个消息给我。每次只发一个,当我处理完消息并给你确认信息后,再发给我下一个。这时候rabbitmq会查看cusumer返回的确认,寻找空闲的cusumer发送消息。
注:当所有的cusumer都很忙,队列可能会被装满。这个情况必须留意。要么增加更多cusumer要么采取其他策略。