一个纯C#写的分布式消息队列介绍2

时间:2022-04-11 06:43:15

一年前,当我第一次开发完EQueue后,写过一篇文章介绍了其整体架构,做这个框架的背景,以及架构中的所有基本概念。通过那篇文章,大家可以对EQueue有一个基本的了解。经过了1年多的完善,EQueue无论是功能上还是成熟性上都完善了不少。所以,希望再写一篇文章,介绍一下EQueue的整体架构和关键特性。

EQueue架构

EQueue是一个分布式的、轻量级、高性能、具有一定可靠性,纯C#编写的消息队列,支持消费者集群消费模式。

主要包括三个部分:producer, broker, consumer。producer就是消息发送者;broker就是消息队列服务器,负责接收producer发送过来的消息,以及持久化消息;consumer就是消息消费者,consumer从broker采用拉模式到broker拉取消息进行消费,具体采用的是long polling(长轮训)的方式。这种方式的最大好处是可以让broker非常简单,不需要主动去推消息给consumer,而是只要负责持久化消息即可,这样就减轻了broker server的负担。同时,consumer由于是自己主动去拉取消息,所以消费速度可以自己控制,不会出现broker给consumer消息推的太快导致consumer来不及消费而挂掉的情况。在消息实时性方面,由于是长轮训的方式,所以消息消费的实时性也可以保证,实时性和推模型基本相当。

EQueue是面向topic的架构,和传统的MSMQ这种面向queue的方式不同。使用EQueue,我们不需要关心queue。producer发送消息时,指定的是消息的topic,而不需要指定具体发送到哪个queue。同样,consumer发送消息也是一样,订阅的是topic,不需要关心自己想从哪个queue接收消息。然后,producer客户端框架内部,会根据当前的topic获取所有可用的queue,然后通过某种queue select strategy选择一个queue,然后把消息发送到该queue;同样,consumer端,也会根据当前订阅的topic,获取其下面的所有的queue,以及当前所有订阅这个topic的consumer,按照平均的方式计算出当前consumer应该分配到哪些queue。这个分配的过程就是消费者负载均衡。

Broker的主要职责是:

发送消息时:负责接收producer的消息,然后持久化消息,然后建立消息索引信息(把消息的全局offset和其在queue中的offset简历映射关系),然后返回结果给producer;

消费消息时:负责根据consumer的pull message request,查询一批消息(默认是一次pull request拉取最多32个消息),然后返回给consumer;

各位看官如果对EQueue中的一些基本概念还不太清楚,可以看一下我去年写的介绍1,写的很详细。下面,我想介绍一下EQueue的一些有特色的地方。

EQueue关键特性高性能与可靠性设计

网络通信模型,采用.NET自带的SocketAsyncEventArgs,内部基于Windows IOCP网络模型。发送消息支持async, sync, oneway三种模式,无论是哪种模式,内部都是异步模式。当同步发送消息时,就是框架帮我们在异步发送消息后,同步等待消息发送结果,等到结果返回后,才返回给消息发送者;如果一定时间还不返回,则报超时异常。在异步发送消息时,采用从EventStore开源项目中学习到的优秀的socket消息发送设计,目前测试下来,性能高效、稳定。通过几个案例运行很长时间,没有出现通信层方面的问题。

broker消息持久化的设计。采用WAL(Write-Ahead Log)技术,以及异步批量持久化到SQL Server的方式确保消息高效持久化且不会丢。消息到达broker后,先写入本地日志文件,这种设计在db, nosql等数据库中很常见,都是为了确保消息或请求不丢失。然后,再异步批量持久化消息到SQL Server,采用.NET自带的SqlBulkCopy技术。这种方式,我们可以确保消息持久化的实时性和很高的吞吐量,因为一条消息只要写入本地日志文件,然后放入内存的一个dict即可。

当broker意外宕机,可能会有一些消息还没持久化到SQL Server;所以,我们在重启broker时,我们除了先从SQL Server恢复所有未消费的消息到内存外,同时记录当前SQL Server中的最后一条消息的offset,然后我们从本地日志文件扫描offset+1开始的所有消息,全部恢复到SQL Server以及内存。

需要简单提一下的是,我们在把消息写入到本地日志文件时,不可能全部写入到一个文件,所以要拆文件。目前是根据log4net来写消息日志,每100MB一个日志文件。为什么是100MB?是因为,我们的这个消息日志文件的用途主要是用来在Broker重启时,恢复SQL Server中最后还没来得及持久化的那些消息的。正常情况下,这些消息量应该不会很多。所以,我们希望,当扫描本地日志文件时,尽量能快速的扫描文件。通常100MB的消息日志文件,已经可以存储不少的消息量,而SQL Server中未持久化的消息通常不会超过这个量,除非当机前,出现长时间消息无法持久化的情况,这种情况,应该会被我们监控到并及时发现,并采取措施。当然,每个消息日志文件的大小,可以支持配置。另外一点,就是从日志文件恢复的时候,还是需要有一个算法的,因为未被持久化的消息,有可能不只在最近的一个消息日志文件里,有可能在多个日志文件里,因为就像前面所说,会出现大量消息没有持久化到SQL Server的情况。

但总之,在保证高性能的前提下,消息不丢(可靠性)是完全可以保证的。