RocketMQ消息存储

时间:2022-06-23 02:32:22

转载:RocketMQ源码学习--消息存储篇

消息中间件—RocketMQ消息存储(一)

RocketMQ高性能之底层存储设计

存储架构

RMQ存储架构

RocketMQ消息存储

上图即为RocketMQ的消息存储整体架构,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog,1G)来存储。

Consume Queue相当于kafka中的partition,是一个逻辑队列,存储了这个Queue在CommiLog中的     起始offset,log大小和MessageTag的hashCode。

每次读取消息队列先读取consumerQueue,然后再通过consumerQueue去commitLog中拿到消息主体。

Kafka存储架构

rocketMQ的设计理念很大程度借鉴了kafka,所以有必要介绍下kafka的存储结构设计:

RocketMQ消息存储

存储特点: 和RocketMQ类似,每个Topic有多个partition(queue),kafka的每个partition都是一个独立的物理文件,消息直接从里面读写。

根据之前阿里中间件团队的测试,一旦kafka中Topic的partitoin数量过多,队列文件会过多,会给磁盘的IO读写造成很大的压力,造成tps迅速下降。

所以RocketMQ进行了上述这样设计,consumerQueue中只存储很少的数据,消息主体都是通过CommitLog来进行读写。

ps:上一行加粗理解:consumerQueue存储少量数据,即使数量很多,但是数据量不大,文件可以控制得非常小,绝大部分的访问还是Page Cache的访问,而不是磁盘访问。正式部署也可以将CommitLog和consumerQueue放在不同的物理SSD,避免多类文件进行IO竞争。

RMQ存储设计优缺点

优点:

队列轻量化,单个队列数据量非常少。对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高。

缺点:

写虽然完全是顺序写,但是读却变成了完全的随机读。

读一条消息,会先读ConsumeQueue,再读CommitLog,增加了开销。

要保证CommitLog与ConsumeQueue完全的一致,增加了编程的复杂度。

缺点克服:

随机读,尽可能让读命中page cache,减少IO读操作,所以内存越大越好。如果系统中堆积的消息过多,读数据要访问磁盘会不会由于随机读导致系统性能急剧下降,答案是否定的。 
访问page cache 时,即使只访问1k的消息,系统也会提前预读出更多数据,在下次读时,就可能命中内存。 
随机访问Commit Log磁盘数据,系统IO调度算法设置为NOOP方式,会在一定程度上将完全的随机读变成顺序跳跃方式,而顺序跳跃方式读较完全的随机读性能会高5倍以上。 
另外4k的消息在完全随机访问情况下,仍然可以达到8K次每秒以上的读性能。 
由于Consume Queue存储数据量极少,而且是顺序读,在PAGECACHE预读作用下,Consume Queue的读性能几乎与内存一致,即使堆积情况下。所以可认为Consume Queue完全不会阻碍读性能。 
Commit Log中存储了所有的元信息,包含消息体,类似于Mysql、Oracle的redolog,所以只要有Commit Log在,Consume Queue即使数据丢失,仍然可以恢复出来。

RMQ存储底层实现

MappedByteBuffer

RocketMQ中的文件读写主要就是通过MappedByteBuffer进行操作,来进行文件映射。利用了nio中的FileChannel模型,可以直接将物理文件映射到缓冲区,提高读写速度。

这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销。

这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了。

page cache

刚刚提到的缓冲区,也就是之前说到的page cache。

通俗的说:pageCache是系统读写磁盘时为了提高性能将部分文件缓存到内存中,下面是详细解释:

page cache:这里所提及到的page cache,在我看来是linux中vfs虚拟文件系统层的cache层,一般pageCache默认是4K大小,它被操作系统的内存管理模块所管理,文件被映射到内存,一般都是被mmap()函数映射上去的。

mmap()函数会返回一个指针,指向逻辑地址空间中的逻辑地址,逻辑地址通过MMU映射到page cache上。

RocketMQ消息存储

上图中,整个OS有3.7G的物理内存,用掉了2.7G,应当还剩下1G空闲的内存,但OS给出的却是175M。

因为OS发现系统的物理内存有大量剩余时,为了提高IO的性能,就会使用多余的内存当做文件缓存,也就是图上的buff / cache,广义我们说的Page Cache就是这些内存的子集。

pageCache缺点:

内核把可用的内存分配给Page Cache后,free的内存相对就会变少,如果程序有新的内存分配需求或者缺页中断,恰好free的内存不够,内核还需要花费一点时间将热度低的Page Cache的内存回收掉,对性能非常苛刻的系统会产生毛刺。

内存映射    零拷贝

RMQ发送、消费逻辑

发送逻辑

发送时,Producer不直接与Consume Queue打交道。上文提到过,RMQ所有的消息都会存放在Commit Log中,为了使消息存储不发生混乱,对Commit Log进行写之前就会上锁。

消息持久被锁串行化后,对Commit Log就是顺序写,也就是常说的Append操作。配合上Page Cache,RMQ在写Commit Log时效率会非常高。

Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据,不停的轮询,将当前的consumeQueue中的offSet和commitLog中的offSet进行对比,将多出来的offSet进行解析,然后put到consumeQueue中的MapedFile中。

ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。而IndexFile(索引文件)则只是为了消息查询提供了一种通过key或时间区间来查询消息的方法(ps:这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程)。

消费逻辑

消费时,Consumer不直接与Commit Log打交道,而是从Consume Queue中去拉取数据。拉取的顺序从旧到新,在文件表示每一个Consume Queue都是顺序读,充分利用了Page Cache。光拉取Consume Queue是没有数据的,里面只有一个对Commit Log的引用,所以再次拉取Commit Log。

但整个RMQ只有一个Commit Log,虽然是随机读,但整体还是有序地读,只要那整块区域还在Page Cache的范围内,还是可以充分利用Page Cache。(dstat命令)

对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Noop”(此时块存储采用SSD的话),随机读的性能也会有所提升。

刷盘方式

RocketMQ消息存储

同步刷盘

在消息真正落盘后,才返回成功给Producer,只要磁盘没有损坏,消息就不会丢。一般只用于金融场景。

异步刷盘

读写文件充分利用了Page Cache,即写入Page Cache就返回成功给Producer,RMQ中有两种方式进行异步刷盘,整体原理是一样的。

RMQ文件存储模型层

RocketMQ消息存储

RocketMQ文件存储模型层次结构如上图所示,根据类别和作用从概念模型上大致可以划分为5层,下面将从各个层次分别进行分析和阐述:

(1)RocketMQ业务处理器层

Broker端对消息进行读取和写入的业务逻辑入口,这一层主要包含了业务逻辑相关处理操作(根据解析RemotingCommand中的RequestCode来区分具体的业务操作类型,进而执行不同的业务处理流程),比如前置的检查和校验步骤、构造MessageExtBrokerInner对象、decode反序列化、构造Response返回对象等。

(2)RocketMQ数据存储组件层

该层主要是RocketMQ的存储核心类—DefaultMessageStore,其为RocketMQ消息数据文件的访问入口,通过该类的“putMessage()”和“getMessage()”方法完成对CommitLog消息存储的日志数据文件进行读写操作(具体的读写访问操作还是依赖下一层中CommitLog对象模型提供的方法);另外,在该组件初始化时候,还会启动很多存储相关的后台服务线程,包括AllocateMappedFileService(MappedFile预分配服务线程)、ReputMessageService(回放存储消息服务线程)、HAService(Broker主从同步高可用服务线程)、StoreStatsService(消息存储统计服务线程)、IndexService(索引文件服务线程)等。

(3)RocketMQ存储逻辑对象层

该层主要包含了RocketMQ数据文件存储直接相关的三个模型类IndexFile、ConsumerQueue和CommitLog。IndexFile为索引数据文件提供访问服务,ConsumerQueue为逻辑消息队列提供访问服务,CommitLog则为消息存储的日志数据文件提供访问服务。这三个模型类也是构成了RocketMQ存储层的整体结构(对于这三个模型类的深入分析将放在后续篇幅中)。

(4)封装的文件内存映射层

RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel两种方式完成数据文件的读写。其中,采用MappedByteBuffer这种内存映射磁盘文件的方式完成对大文件的读写,在RocketMQ中将该类封装成MappedFile类。这里限制的问题在上面已经讲过;对于每类大文件(IndexFile/ConsumerQueue/CommitLog),在存储时分隔成多个固定大小的文件(单个IndexFile文件大小约为400M、单个ConsumerQueue文件大小约5.72M、单个CommitLog文件大小为1G),其中每个分隔文件的文件名为前面所有文件的字节大小数+1,即为文件的起始偏移量,从而实现了整个大文件的串联。这里,每一种类的单个文件均由MappedFile类提供读写操作服务(其中,MappedFile类提供了顺序写/随机读、内存数据刷盘、内存清理等和文件相关的服务)。

(5)磁盘存储层

主要指的是部署RocketMQ服务器所用的磁盘。这里,需要考虑不同磁盘类型(如SSD或者普通的HDD)特性以及磁盘的性能参数(如IOPS、吞吐量和访问时延等指标)对顺序写/随机读操作带来的影响。