C#分布式消息队列 EQueue 2.0 发布啦

时间:2022-04-11 06:43:03

最近花了我几个月的业余时间,对EQueue做了一个重大的改造,消息持久化采用本地写文件的方式。到现在为止,总算完成了,所以第一时间写文章分享给大家这段时间我所积累的一些成果。

EQueue开源地址:https://github.com/tangxuehua/equeue

EQueue相关文档:

EQueue Nuget地址:

昨天,我写过一篇关于EQueue 2.0性能测试结果的文章,有兴趣的可以看看。

文章地址:

为什么要改为文件存储?SQL Server的问题

之前EQueue的消息持久化是采用SQL Server的。一开始我觉得没什么问题,采用的是异步定时批量持久化,使用SqlBulkCopy的方法,这个方法测试下来,批量插入消息的性能还不错,就决定使用了。一开始我并没有在使用到EQueue后做集成的性能测试。在功能上确实没什么问题了。而且使用DB持久化也有很多好处,比如消息查询很简单,DB天生支持各种方式的查询。删除消息也非常简单,一条DELETE语句即可。所以功能实现比较顺利。但后来当我对EQueue做性能测试时,发现一些问题。当数据库服务器和Broker本身部署在不同的服务器上时,持久化消息也会走网卡,消耗带宽,影响消息的发送和消费的TPS。而如果数据库服务器部署在Broker同一台服务器上,则因为SQLServer本身也会消耗CPU以及内存,也会影响Broker的消息发送和消费的TPS。另外SqlBulkCopy的速度,再本身机器正在接收大量的发送消息和拉取消息的请求时,会不太稳定。经过一些测试,发现整个EQueue Broker的性能不太理想。然后又想想,Broker服务器有有一个硬件一直没有好好利用起来,那就是硬盘。假设我们的消息是持久化到本地硬盘的,顺序写文件,就应该能解决SQL Server的问题了。所以,开始调研如何实现文件持久化消息的方案了。

消息缓存在托管内存的GC的问题

之前消息存储在SQL Server,如果消费者每次读取消息时,总是从数据库去读取,那对数据库就是不断的写入和读取,性能不太理想。所以当初的思路是,尽量把最近可能要被消费的消息缓存在本地内存中。当初的做法是设计了一个很大的ConcurrentDictionary<long, Message>,这个字典就是存放了所有可能会被消费的消息。如果要消费的消息当前不在这个字典里,就批量从DB拉取一批出来消费。这个设计可以尽可能的避免读取DB的情况。但是带来了另一个问题。就是我们对这个字典在高并发不断的写入和读取。且这个字典里缓存的消息又很多,到到达几百上千万时,GC的压力过大,导致很多线程都会被阻塞。严重影响Broker的TPS。

所以,基于上面的两个主要原因,我想到了两个思路来解决:1)采用写文件的方式来持久化消息;2)使用非托管内存来缓存将要被消费的消息;下面我们来看看这两个设计的一些关键问题的设计思路。

文件存储的关键问题设计心路背景

之前一直无法驾驭写文件的设计。因为精细化的将数据写入文件,并能要精确的读取想要的数据,实在没什么经验。之前虽然也知道阿里的RocketMQ的消息持久化也是采用顺序写文件的方式的,但是看了代码,发现设计很复杂,一下子也比较难懂。尝试看了多次也无法完全理解。所以一直无法掌握这种方式。有一天不经意间想到之前看过的EventStore这个开源项目中,也有写文件的设计。这个项目是CQRS架构之父greg young所主导的开源项目,是一个专门为ES(Event Sourcing)设计模式中提供保存事件流支持的事件流存储系统。于是下定决心专研其源码,看C#代码肯定还是比Java容易,呵呵。经过一段时间的摸索之后,,基本学到了它是如何写文件以及如何读文件的。了解了很多设计思路。然后,在看懂了EventStore的文件存储设计之后,再去看RocketMQ的文件持久化的设计,发现惊人的相似。原来看不懂的代码现在也能看懂了,因为思路差不多的。所以,这给我开始动手提供了很大的信心。经过自己的一些准备(文件读写的性能验证)和设计思路整理后,终于开始动手了。

如何写消息到文件?

其实说出来也很简单。之前一直以为写文件就是一个消息一行呗。这样当我们要找哪个消息时,只需要知道行号即可。确实,理论上这样也挺好。但上面这两个开源项目都不是这样做的,而是都是采用更精细化的直接写二进制的方式。搞清楚写入的格式之后,还要考虑一个文件写不下的时候怎么办?因为一个文件总是有大小的,比如1G,那超过1G后,必然要创建新的文件,然后把消息写入新的文件。所以,我们就又有了Chunk的概念。一个Chunk就是一个文件,假设我们现在实现了一个FileMessageStore,表示对文件持久化的封装,那这个FileMessageStore肯定维护了一堆的Chunk。然后我们也很容易想到一点,就是Chunk有3种状态:1)New,表示刚创建的Chunk,这种Chunk我们可以写入新消息进去;2)Completed,已写入完成的Chunk,这种Chunk是只读的;3)OnGoing的Chunk,就是当FileMessageStore初始化时,要从磁盘的某个chunk的目录下加载所有的Chunk文件,那不难理解,最后一个文件之前的Chunk文件应该都是Completed的;最后一个Chunk文件可能写入了一半,就是之前没完全用完的。所以,本质上New和Ongoing的Chunk其实是一样的,只是初始化的方式不同。