EQueue - 一个C#写的开源分布式消息队列的总体介绍

时间:2021-07-26 13:47:56

前言

本文想介绍一下前段时间在写enode时,顺便实现的一个分布式消息队列equeue。这个消息队列的思想不是我想出来的,而是通过学习阿里的rocketmq后,自己用c#实现了一个轻量级的简单版本。一方面可以通过写这个队列让自己更深入的掌握消息队列的一些常见问题;另一方面也可以用来和enode集成,为enode中的command和domain event的消息传递提供支持。目前在.net平台,比较好用的消息队列,最常见的是微软的MSMQ了吧,还有像rabbitmq也有.net的client端。这些消息队列都很强大和成熟。但当我学习了kafka以及阿里的rocketmq(早期版本叫metaq,自metaq 3.0后改名为rocketmq)后,觉得rocketmq的设计思想深深吸引了我,因为我不仅能理解其思想,还有其完整的源代码可以学习。但是rocketmq是java写的,且目前还没有.net的client端,所以不能直接使用(有兴趣的朋友可以为其写一个.net的client端),所以在学习了rocketmq的设计文档以及大部分代码后,决定自己用c#写一个出来。

项目开源地址:https://github.com/tangxuehua/equeue,项目中包含了队列的全部源代码以及如何使用的示例。也可以在enode项目中看到如何使用。

EQueue消息队列中的专业术语

Topic

一个topic就是一个主题。一个系统中,我们可以对消息划分为一些topic,这样我们就能通过topic,将消息发送到不同的queue。

Queue

一个topic下,我们可以设置多个queue,每个queue就是我们平时所说的消息队列;因为queue是完全从属于某个特定的topic的,所以当我们要发送消息时,总是要指定该消息所属的topic是什么。然后equeue就能知道该topic下有几个queue了。但是到底发送到哪个queue呢?比如一个topic下有4个queue,那对于这个topic下的消息,发送时,到底该发送到哪个queue呢?那必定有个消息被路由的过程。目前equeue的做法是在发送一个消息时,需要用户指定这个消息对应的topic以及一个用来路由的一个object类型的参数。equeue会根据topic得到所有的queue,然后根据该object参数通过hash code然后取模queue的个数最后得到要发送的queue的编号,从而知道该发送到哪个queue。这个路由消息的过程是在发送消息的这一方做的,也就是下面要说的producer。之所以不在消息服务器上做是因为这样可以让用户自己决定该如何路由消息,具有更大的灵活性。

Producer

就是消息队列的生产者。我们知道,消息队列的本质就是实现了publish-subscribe的模式,即生产者-消费者模式。生产者生产消息,消费者消费消息。所以这里的Producer就是用来生产和发送消息的。

Consumer

就是消息队列的消费者,一个消息可以有多个消费者。

Consumer Group

消费者分组,这可能对大家来说是一个新概念。之所以要搞出一个消费者分组,是为了实现下面要说的集群消费。一个消费者分组中包含了一些消费者,如果这些消费者是要集群消费,那这些消费者会平均消费该分组中的消息。

Broker

equeue中的broker负责消息的中转,即接收producer发送过来的消息,然后持久化消息到磁盘,然后接收consumer发送过来的拉取消息的请求,然后根据请求拉取相应的消息给consumer。所以,broker可以理解为消息队列服务器,提供消息的接收、存储、拉取服务。可见,broker对于equeue来说是核心,它绝对不能挂,一旦挂了,那producer,consumer就无法实现publish-subscribe了。

集群消费

集群消费是指,一个consumer group下的consumer,平均消费topic下的queue。具体如何平均可以看一下下面的架构图,这里先用文字简单描述一下。假如一个topic下有4个queue,然后当前有一个consumer group,该分组下有4个consumer,那每个consumer就被分配到该topic下的一个queue,这样就达到了平均消费topic下的queue的目的。如果consumer group下只有两个consumer,那每个consumer就消费2个queue。如果有3个consumer,则第一个消费2个queue,后面两个每个消费一个queue,从而达到尽量平均消费。所以,可以看出,我们应该尽量让consumer group下的consumer的数目和topic的queue的数目一致或成倍数关系。这样每个consumer消费的queue的数量总是一样的,这样每个consumer服务器的压力才会差不多。当前前提是这个topic下的每个queue里的消息的数量总是差不多多的。这点我们可以对消息根据某个用户自己定义的key来进行hash路由来保证。

广播消费

广播消费是指一个consumer只要订阅了某个topic的消息,那它就会收到该topic下的所有queue里的消息,而不管这个consumer的group是什么。所以对于广播消费来说,consumer group没什么实际意义。consumer可以在实例化时,我们可以指定是集群消费还是广播消费。

消费进度(offset)

消费进度是指,当一个consumer group里的consumer在消费某个queue里的消息时,equeue是通过记录消费位置(offset)来知道当前消费到哪里了。以便该consumer重启后继续从该位置开始消费。比如一个topic有4个queue,一个consumer group有4个consumer,则每个consumer分配到一个queue,然后每个consumer分别消费自己的queue里的消息。equeue会分别记录每个consumer对其queue的消费进度,从而保证每个consumer重启后知道下次从哪里开始继续消费。实际上,也许下次重启后不是由该consumer消费该queue了,而是由group里的其他consumer消费了,这样也没关系,因为我们已经记录了这个queue的消费位置了。所以可以看出,消费位置和consumer其实无关,消费位置完全是queue的一个属性,用来记录当前被消费到哪里了。另外一点很重要的是,一个topic可以被多个consumer group里的consumer订阅。不同consumer group里的consumer即便是消费同一个topic下的同一个queue,那消费进度也是分开存储的。也就是说,不同的consumer group内的consumer的消费完全隔离,彼此不受影响。还有一点就是,对于集群消费和广播消费,消费进度持久化的地方是不同的,集群消费的消费进度是放在broker,也就是消息队列服务器上的,而广播消费的消费进度是存储在consumer本地磁盘上的。之所以这样设计是因为,对于集群消费,由于一个queue的消费者可能会更换,因为consumer group下的consumer数量可能会增加或减少,然后就会重新计算每个consumer该消费的queue是哪些,这个能理解的把?所以,当出现一个queue的consumer变动的时候,新的consumer如何知道该从哪里开始消费这个queue呢?如果这个queue的消费进度是存储在前一个consumer服务器上的,那就很难拿到这个消费进度了,因为有可能那个服务器已经挂了,或者下架了,都有可能。而因为broker对于所有的consumer总是在服务的,所以,在集群消费的情况下,被订阅的topic的queue的消费位置是存储在broker上的,存储的时候按照不同的consumer group做隔离,以确保不同的consumer group下的consumer的消费进度互补影响。然后,对于广播消费,由于不会出现一个queue的consumer会变动的情况,所以我们没必要让broker来保存消费位置,所以是保存在consumer自己的服务器上。

EQueue是什么?

EQueue - 一个C#写的开源分布式消息队列的总体介绍

通过上图,我们能直观的理解equeue。这个图是从rocketmq的设计文档中拿来的,呵呵。由于equeue的设计思想完全和rocketmq一致,所以我就拿过来用了。每个producer可以向某个topic发消息,发送的时候根据某种路由策略(producer可自定义)将消息发送到某个特定的queue。然后consumer可以消费特定topic下的queue里的消息。上图中,TOPIC_A有两个消费者,这两个消费者是在一个group里,所以应该平均消费TOPIC_A下的queue但由于有三个queue,所以第一个consumer分到了2个queue,第二个consumer分到了1个。对于TOPIC_B,由于只有一个消费者,那TOPIC_B下的所有queue都由它消费。所有的topic信息、queue信息、还有消息本身,都存储在broker服务器上。这点上图中没有体现出来。上图主要关注producer,consumer,topic,queue这四个东西之间的关系,并不关注物理服务器的部署架构。

关键问题的思考

1.Producer,Broker,Consumer三者之间如何通信

由于是用c#实现,且因为一般是在局域网内部署,为了实现高性能通信,我们可以利用异步socket来通信。.net本身提供了很好的异步socket通信的支持;我们也可以用zeromq来实现高性能的socket通信。本来想直接使用zeromq来实现通信模块就好了,但后来自己学习了一下.net自带的socket通信相关知识,发现也不难,所以就自己实现了一个,呵呵。自己实现的好处是我可以自己定义消息的协议,目前这部分实现代码在ecommon基础类库中,是一个独立的可服用的与业务场景无关的基础类库。有兴趣的可以去下载下来看看代码。经过了自己的一些性能测试,发现通信模块的性能还是不错的。一台broker,四台producer同时向这个broker发送消息,每秒能发送的消息4W没有问题,更多的producer还没测试。

2.消息如何持久化

消息持久化方面主要考虑的是性能问题,还有就是消息如何快速的读取。

1. 首先,一台broker上的消息不需要一直保存在该broker服务器上,因为这些消息总会被消费掉。根据阿里rocketmq的设计,默认会1天删除一次已经被消费过的消息。所以,我们可以理解,broker上的消息应该不会无限制增长,因为会被定期删除。所以不必考虑一台broker上消息放不下的问题。

2. 如何快速的持久化消息?一般来说,我觉得有两种方式:1)顺序写磁盘文件;2)用现成的key,value的nosql产品来存储;rocketmq目前用的是自己写文件的方式,这种方式的难点是写文件比较复杂,因为所有消息都是顺序append到文件末尾,虽然性能非常高,但复杂度也很高;比如所有消息不能全写在一个文件里,一个文件到达一定大小后需要拆分,一旦拆分就会产生很多问题,呵呵。拆分后如何读取也是比较复杂的问题。还有由于是顺序写入文件的,那我们还需要把每一个消息在文件中的起始位置和长度需要记录下来,这样consumer在消费消息时,才能根据offset从文件中拿到该消息。总之需要考虑的问题很多。如果是用nosql来持久化消息,那可以省去我们写文件时遇到的各种问题,我们只需要关心如何把消息的key和该消息在queue中的offset对应起来即可。另外一点疑问是,queue里的信息要持久化吗?先要想清楚queue里放的是什么东西。当broker接收到一个消息后,首先肯定是要先持久化,完成后需要把消息放入queue里。但由于内存很有限,我们不可能把这个消息直接放入queue里,我们其实要放的只需要时该消息在nosql里的key即可,或者如果是用文件来持久化,那放的是该消息在文件中的偏移量offset,即存储在文件的那个位置(比如是哪个行号)。所以,实际上,queue只是一个消息的索引。那有必要持久化queue吗?可以持久化,这样毕竟在broker重启的时候,恢复queue的时间可以缩短。那需要和持久化消息同步持久化吗?显然不需要,我们可以异步定时持久化每个queue,然后恢复queue的时候,可以先从持久化的部分恢复,然后再把剩下的部分通过持久化的消息来补充以达到queue因为异步持久化而慢的部分可以追平。所以,经过上面的分析,消息本身都是放在nosql中,queue全部在内存中。

那消息如何持久化呢?我觉得最好的办法是让每个消息有一个全局的顺序号,一旦消息被写入nosql后,该消息的全局顺序号就确定了,然后我们在更新对应的queue的信息时,把该消息的全局顺序号传给queue,这样queue就能把queue自己对该消息的本地顺序号和该消息的全局顺序号建立映射关系。相关代码如下:

public MessageStoreResult StoreMessage(Message message, int queueId)
{
var queues = GetQueues(message.Topic);
var queueCount = queues.Count;
if (queueId >= queueCount || queueId < 0)
{
throw new InvalidQueueIdException(message.Topic, queueCount, queueId);
}
var queue = queues[queueId];
var queueOffset = queue.IncrementCurrentOffset();
var storeResult = _messageStore.StoreMessage(message, queue.QueueId, queueOffset);
queue.SetMessageOffset(queueOffset, storeResult.MessageOffset);
return storeResult;
}

没什么比代码更能说明问题了,呵呵。上的代码的思路是,接收一个消息对象和一个queueId,queueId表示当前消息要放到第几个queue里。然后内部逻辑是,先获取该消息的topic的所有queue,由于queue和topic都在内存,所以这里没性能问题。然后检查一下当前传递进来的queueId是否合法。如果合法,那就定位到该queue,然后通过IncrementCurrentOffset方法,将queue的内部序号加1并返回,然后持久化消息,持久化的时候把queueId以及queueOffset一起持久化,完成后返回一个消息的全局序列号。由于messageStore内部会把消息内容、queueId、queueOffset,以及消息的全局顺序号一起作为一个整体保存到nosql中,key就是消息的全局序列号,value就是前面说的整体(被序列化为二进制)。然后,在调用queue的SetMessageOffset方法,把queueOffset和message的全局offset建立映射关系即可。最后返回一个结果。messageStore.StoreMessage的内存实现大致如下:

public MessageStoreResult StoreMessage(Message message, int queueId, long queueOffset)
{
var offset = GetNextOffset();
_queueCurrentOffsetDict[offset] = new QueueMessage(message.Topic, message.Body, offset, queueId, queueOffset, DateTime.Now);
return new MessageStoreResult(offset, queueId, queueOffset);
}

GetNextOffset就是获取下一个全局的消息序列号,QueueMessage就是上面所说的“整体”,因为是内存实现,所以就用了一个ConcurrentDictionary来保存一下queueMessage对象。如果是用nosql来实现messageStore,则这里需要写入nosql,key就是消息的全局序列号,value就是queueMessage的二进制序列化数据。通过上面的分析我们可以知道我们会将消息的全局序列号+queueId+queueOffset一起整体作为一条记录持久化起来。这样做有两个非常好的特性:1)实现了消息持久化和消息在queue中的位置的持久化的原子事务;2)我们总是可以根据这些持久化的queueMessage还原出所有的queue的信息,因为queueMessage里包含了消息和消息在queue的中的位置信息;

基于这样的消息存储,当某个consumer要消费某个位置的消息时,我们可以通过先通过queueId找到queue,然后通过消息在queueOffset(由consumer传递过来的)获取消息的全局offset,然后根据该全局的offset作为key从nosql拿到消息。实际上现在的equeue是批量拉取消息的,也就是一次socket请求不是拉一个消息,而是拉一批,默认是32个消息。这样consumer可以用更少的网络请求拿到更多的消息,可以加快消息消费的速度。

3.Producer发送消息时的消息路由的细节

producer在发送消息时,如何知道当前topic下有多少个queue呢?每次发送消息时都要去broker上查一下吗?显然不行,这样发送消息的性能就上不去了。那怎么办呢?就是异步,呵呵。producer可以定时向broker发送请求,获取topic下的queue数量,然后保存起来。这样每次producer在发送消息时,就只要从本地缓存里拿即可。因为broker上topic的queue的数量一般不会变化,所以这样的缓存很有意义。那还有一个问题,当前producer第一次对某个topic发送消息时,queue哪里来呢?因为定时线程不知道要向broker拿哪个topic下的queue数量,因为此时producer端还没有一个topic呢,因为一个消息都还没发送过。那就是需要判断一下,如果当前topic没有queue的count信息,则直接从broker上获取queue的count信息。然后再缓存起来,在发送当前消息。然后第二次发送时,因为缓存里已经有了该消息,所以就不必再从broker拿了,且后续定时线程也会自动去更新该topic下的queue的count了。好,producer有了topic的queue的count,那用户在发送消息时,框架就能把这个topic的queueCount传递给用户,然后用户就能根据自己的需要将消息路由到第几个queue了。

4.consumer负载均衡如何实现

consumer负载均衡的意思是指,在消费者集群消费的情况下,如何让同一个consumer group里的消费者平均消费同一个topic下的queue。所以这个负载均衡本质上是一个将queue平均分配给consumer的过程。那么怎么实现呢?通过上面负载均衡的定义,我们只要,要做负载均衡,必须要确定consumer group和topic;然后拿到consumer group下的所有consumer,以及topic下的所有queue;然后对于当前的consumer,就能计算出来当前consumer应该被分配到哪些queue了。我们可以通过如下的函数来得到当前的consumer应该被分配到哪几个queue。

public class AverageAllocateMessageQueueStrategy : IAllocateMessageQueueStrategy
{
public IEnumerable<MessageQueue> Allocate(string currentConsumerId, IList<MessageQueue> totalMessageQueues, IList<string> totalConsumerIds)
{
var result = new List<MessageQueue>(); if (!totalConsumerIds.Contains(currentConsumerId))
{
return result;
} var index = totalConsumerIds.IndexOf(currentConsumerId);
var totalMessageQueueCount = totalMessageQueues.Count;
var totalConsumerCount = totalConsumerIds.Count;
var mod = totalMessageQueues.Count() % totalConsumerCount;
var size = mod > 0 && index < mod ? totalMessageQueueCount / totalConsumerCount + 1 : totalMessageQueueCount / totalConsumerCount;
var averageSize = totalMessageQueueCount <= totalConsumerCount ? 1 : size;
var startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
var range = Math.Min(averageSize, totalMessageQueueCount - startIndex); for (var i = 0; i < range; i++)
{
result.Add(totalMessageQueues[(startIndex + i) % totalMessageQueueCount]);
} return result;
}
}

函数里的实现就不多分析了。这个函数的目的就是根据给定的输入,返回当前consumer该分配到的queue。分配的原则就是平均分配。好了,有了这个函数,我们就能很方便的实现负载均衡了。我们可以对每一个正在运行的consumer内部开一个定时job,该job每隔一段时间进行一次负载均衡,也就是执行一次上面的函数,得到当前consumer该绑定的最新queue。因为每个consumer都有一个groupName属性,用于表示当前consumer属于哪个group。所以,我们就可以在负载均衡时到broker获取当前group下的所有consumer;另一方面,因为每个consumer都知道它自己订阅了哪些topic,所以有了topic信息,就能获取topic下的所有queue的信息了,有了这两样信息,每个consumer就能自己做负载均衡了。先看一下下面的代码:

_scheduleService.ScheduleTask(Rebalance, Setting.RebalanceInterval, Setting.RebalanceInterval);
_scheduleService.ScheduleTask(UpdateAllTopicQueues, Setting.UpdateTopicQueueCountInterval, Setting.UpdateTopicQueueCountInterval);
_scheduleService.ScheduleTask(SendHeartbeat, Setting.HeartbeatBrokerInterval, Setting.HeartbeatBrokerInterval);

每个consumer内部都会启动三个定时的task,第一个task表示要定时做一次负载均衡;第二个task表示要定时更新当前consumer订阅的所有topic的queueCount信息,并把最新的queueCount信息都保存在本地;第三个task表示当前consumer会向broker定时发送心跳,这样broker就能通过心跳知道某个consumer是否还活着,broker上维护了所有的consumer信息。一旦有新增或者发现没有及时发送心跳过来的consumer,就会认为有新增或者死掉的consumer。因为broker上维护了所有的consumer信息,所以他就能提供查询服务,比如根据某个consumer group查询该group下的consumer。

通过这三个定时任务,就能完成消费者的负载均衡了。先看一下Rebalance方法:

private void Rebalance()
{
foreach (var subscriptionTopic in _subscriptionTopics)
{
try
{
RebalanceClustering(subscriptionTopic);
}
catch (Exception ex)
{
_logger.Error(string.Format("[{0}]: rebalanceClustering for topic [{1}] has exception", Id, subscriptionTopic), ex);
}
}
}

代码很简单,就是对每个订阅的topic做负载均衡处理。再看一下RebalanceClustering方法:

上面的代码不多分析了,就是先根据consumer group和topic获取所有的consumer,然后对consumer做排序处理。之所以要做排序处理是为了确保负载均衡时对已有的分配情况尽量不发生改变。接下来就是从本地获取topic下的所有queue,同样根据queueId做一下排序。然后就是调用上面的分配算法计算出当前consumer应该分配到哪些queue。最后调用UpdatePullRequestDict方法,用来对新增或删除的queue做处理。对于新增的queue,要创建一个独立的worker线程,开始从broker拉取消息;对于删除的queue,要停止其对应的work,停止拉取消息。

通过上面的介绍和分析,我们大家知道了equeue是如何实现消费者的负载均衡的。我们可以看出,因为每个topic下的queue的更新是异步的定时的,且负载均衡本身也是定时的,且broker上维护的consumer的信息也不是事实的,因为每个consumer发送心跳到broker不是实时发送的,而是比如每隔5s发送一次。所有这些因为都是异步的设计,所以可能会导致在负载均衡的过程中,同一个queue可能会被两个消费者同时消费。这个就是所谓的,我们只能做到一个消息至少被消费一次,但equeue层面做不到一个消息只会被消费一次。实际上像rocketmq这种也是这样的思路,放弃一个消息只会被消费一次的实现(因为代价太大,且过于复杂,实际上对于分布式的环境,不太可能做到一个消息只会被消费一次),而是采用确保一个消息至少会被消费一次(即at least once).所以使用equeue,应用方要自己做好对每个消息的幂等处理。

5.如何实现实时消息推送

消息的实时推送,一般有两种做法:推模式(push)和拉模式(pull)。push的方式是指broker主动对所有订阅了该topic的消费者推送消息;pull的方式是指消费者主动到broker上拉取消息;对于推模式,最大的好处就是实时,因为一有新的消息,就会立即推送给消费者。但是有一个缺点就是如果消费者来不及消费,它也会给消费者推消息,这样就会导致消费者端的消息会堵塞。而通过拉的方式,有两种实现:1)轮训的方式拉,比如每隔5s轮训一下是否有新消息,这种方式的缺点是消息不实时,但是消费进度完全由消费者自己把控了;2)开长连接的方式来拉,就是不轮训,消费者和broker之间一直保持的连接通道,然后broker一有新消息,就会利用这个通道把消息发送给消费者。

equeue中目前采用的是通过长连接拉取消息的方式。长连接通过socket长连接实现。但是虽然叫长连接,也不是一直不断开,而是也会设计一个超时的限制,比如一个长连接最大不超过15s,超过15s,则broker发送回复给consumer,告诉consumer当前没有新消息;然后consumer接受到这个回复后,就知道要继续发起下一个长连接来拉取。然后假如在这15s中之内,broker上有新消息了,则broker就能立即主动利用这个长连接通知相应的消费者,把消息传给消费者。所以,可以看出,broker上在处理消费者的拉取消息的请求时,如果当前没有新消息,则会hold住这个socket连接,最多hold 15s,超过15s,则发送返回信息,告诉消费者当前无消息,然后消费者再次发送pull message request过来。通过这样的基于长连接的拉取模式,我们可以实现两个好处:1)消息实时推送;2)由消费者控制消息消费进度;

另外,equeue里还实现了消费者自身的自动限流功能。就是假如当前broker上消息很多,即生产者生产消息的速度大于消费者消费消息的速度,那broker上就会有消息被堆积。那此时消费者在拉取消息时,总是会有新消息拉取到,但是消费者又来不及处理这么多消息。所以equeue框架内置了一个限流(流控,流量控制)的设计,就是可以允许用于配制一个消费者端堆积的消息的上限,比如3000,超过这个数目(可配置),则equeue会让消费者以慢一点的频率拉取消息。比如延迟个多少毫秒(延迟时间可配置)再拉取。这样就简单的实现了流控的目的。

6.如何处理消息消费失败的情况

作为一个消息队列,消费者总是可能会在消费消息时抛出异常,在equeue中这种情况就是消息消费失败的情况。通过上面的消费进度的介绍,大家知道了每个queue对某个特定的consumer group,都有一个唯一的消费进度。实际上,消息被拉取到consumer本地后,可能会被以两种方式消费,一种是并行消费,一种是线性消费。

并行消费的意思是,假如当前一次性拉取过来32个消息,那equeue会通过启动task(即开多线程)的方式并行消费每个消息;

线性消费的意思是,消息是在一个独立的单线程中顺序消费,消费顺序和拉取过来的顺序相同。

对于线性消费,假如前一个消息消费的时候失败了,也就是抛异常了,那该怎么办呢?可能想到的办法是重试个3次,但是要是重试后还是失败呢?总不能因为这个消息而导致后面的消息无法把消费吧?呵呵!对于这种情况,先说一下rocketmq里的处理方式吧:它的做法是,当遇到消费失败的情况,没有立马重试,而是直接把这个消息发送到broker上的某个重试队列,发送成功后,就可以往下消费下一个消息了。因为一旦发送到重试队列,那意味着这个消息就最后总是会被消费了,因为该消息不会丢了。但是要是发送到broker的重试队列也不成功呢?这个?!其实这种情况不大应该出现,如果出现,那基本就是broker挂了,呵呵。

rocketmq中,对于这种情况,那会把这个失败的消息放入本地内存队列,慢慢消费它。然后继续往后消费后面的消息。现在你一定很关心queue的offset是如何更新的?这里涉及到一个滑动门的概念。当一批消息从broker拉取到消费者本地后,并不是马上消费的,而是先放入一个本地的SortedDictionary,key就是消息在queue里的位置,value就是消息本身。因为是一个排序的dictionary,所以key最小的消息意味着是最前面的消息,最大的消息就是最后面的消息。然后不管是并行消费还是线性消费,只要某个消息被消费了,那就从这个SortedDictionary里移除掉。每次被移除一个消息时,总是会返回当前这个SortedDictionary里的最小的key,然后我们就能判断这个key是否和上次比是否前移了,如果是,则更新queue的这个最新的offset。因为每次移除一个消息的时候,总是返回当前SortedDictionary里的最小的key,所以,假如当前offset是3,然后offset为4的这个消息一直消费失败,所以不会被移除,但是offset为5,6,7,8的这些消息虽然都消费成功了,但是只要offset为4的这个消息没有被移除,那最小的key就不会往前移动。这个就是所谓的滑动门的概念了。就好比是在铁轨上一辆在跑的动车,offset的往前移动就好比是动车在不断往前移动。因为我们希望offset总是会不断往前移动,所以不希望前面的某个消费失败的消息让这个滑动门停止移动(即我们总是希望这个最小的key能不断变大),所以我们会想方设法让消费失败的消息能不阻碍滑动门的往前移动。所以才把消费失败的消息放入重试队列。

另外一点需要注意一下:并不是每次成功消费完一个消息,就会立马告诉broker更新offset,因为这样那性能肯定很低,broker也会忙死,更好的办法是先只是在本地内存更新queue的offset,然后定时比如5s一次,将最新的offset更新到broker。所以,因为这个异步的存在,同样也会导致某个消息被重复消费的可能性,因为broker上的offset肯定比实际的消费进度要慢,有5s的时间差。所以,再次强调,应用方必须要处理好对消息的幂等处理!比如enode框架中,对每个command消息,框架内部都做了command的幂等处理。所以使用enode框架的应用,自身无需对command做幂等处理方面的考虑。

上面提到了并行消费和线性消费,其实对于offset的更新来说是一样的,因为并行消费无非是多线程同时从SortedDictionary中移除消费成功的消息,而单线程只是单个线程去移除SortedDictionary中的消息。所以我们要通过锁的机制,保证对SortedDictionary的操作是线程安全的。目前用了ReaderWriterLockSlim来实现对方法调用的线层安全。有兴趣的朋友可以去看一下代码。

最后,也是重点,呵呵。equeue目前还没有实现将失败的消息发回到broker的重试队列。这个功能以后会考虑加进去。

7.如何解决Broker的单点问题

这个问题比较复杂,目前equeue不支持broker的master-salve或master-master,而是单点的。我觉得一个成熟的消息队列,为了确保在一个broker挂了的时候,要尽量能确保有其他broker可以接替它,这样才能让消息队列服务器的可靠性。但是这个问题实在太复杂。rocketmq目前实现的也只是master-slave的方式。也就是只要主的master挂了,那producer就无法向broker发送消息了,因为slave的broker是只读的,不能直接接受新消息,slave的broker只能允许被consumer拉取消息。

这个问题,要讨论清楚,需要很多分布式方面的知识。由于篇幅的原因,这里就不做讨论了,实际上我自己也搞不清楚到底该如何设计。希望大牛们多多指点,如何实现broker的高可用哈!