【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Persistence,消息中间件通常采用的几种持久化方式,可以存到数据库里面甚至redis里,你知道不?

时间:2022-11-04 12:15:53

名言警句


任何先进的技术均与魔法无异


追本溯源

【​​经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】​​】


Message Persistence,消息中间件通常采用的几种持久化方式,可以存到数据库里面甚至redis里,你知道不?

RocketMQ的存储和消费关系

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Persistence,消息中间件通常采用的几种持久化方式,可以存到数据库里面甚至redis里,你知道不?

RocketMQ是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点,Producer、Consumer、队列都可以分布式。

  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力
  • 较少的依赖

Producer向一些队列轮流发送消息,队列集合称为Topic。

  • 如果做广播消费,则每个consumer实例消费这个Topic对应的所有队列。
  • 如果做集群消费,则多个Consumer实例平均消费这个topic对应的所有队列

RocketMQ存储方式

RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。

零拷贝原理

Consumer消费消息过程,使用了零拷贝,零拷贝包含以下两种方式:

使用 mmap + write 方式
  • 优点:即使频繁调用,使用小块文件传输,效率也很高
  • 缺点:不能很好的利用DMA方式,会比sendfile多消耗 CPU,内存安全性控制复杂,需要避免JVM Crash问题。
使用sendfile方式
  • 优点:利用DMA方式,消耗CPU较少,大块文件传输效率高,无内存安全新问题。
  • 缺点:小块文件效率低于mmap 方式,只能是BIO方式传输,不能使用 NIO。

RocketMQ 选择了第一种方式,mmap+write 方式,因为有小块数据传输的需求,效果会比 sendfile 更好。​

文件系统

RocketMQ选择 Linux Ext4文件系统,原因如下:

  • Ext4文件系统删除1G大小的文件通常耗时小于 50ms,而 Ext3文件系统耗时约 1s 左右,且删除文件时,磁盘IO压力极大,会导致 IO 写入超时。
  • 文件系统层面需要做以下调优措施
  • 文件系统 IO 调度算法需要调整为deadline,因为 deadline 算法在随机读情况下,可以合并读请求为顺序跳跃方式,从而提高读 IO 吞吐量。

数据存储结构

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Persistence,消息中间件通常采用的几种持久化方式,可以存到数据库里面甚至redis里,你知道不?

RocketMQ主要的存储文件包含commitlogconsumequeueIndexFile。其文件布局如下:  

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Persistence,消息中间件通常采用的几种持久化方式,可以存到数据库里面甚至redis里,你知道不?

Commitlog文件 

消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G 。

文件的命名也及其巧妙,使用该存储在消息文件中的第一个全局偏移量来命名文件文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;这样的设计主要是方便根据消息的物理偏移量,快速定位到消息所在的物理文件。

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Persistence,消息中间件通常采用的几种持久化方式,可以存到数据库里面甚至redis里,你知道不?

消息存储文件:所有主题的消息随着到达Broker的顺序写入commitlog文件。RocketMQ commitlog文件使用顺序写,极大提高了文件的写性能。 

ConsumeQueue文件格式

消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件。

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Persistence,消息中间件通常采用的几种持久化方式,可以存到数据库里面甚至redis里,你知道不?

ConsumeQueue文件引入的目的?

Commitlog文件的基于Topic的索引文件,主要用于消费者根据Topic消费消息,同一个队列中存在多个文件,ConsumeQueue设计极具技巧性,其每个条目使用固定长度(8字节commitlog物理偏移量、4字节消息长度、8字节tag hashcode)。

​​故ConsumeQueue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/ConsumeQueue/{topic}/{queueId}/{fileName}。同样ConsumeQueue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;

ConsumeQueue文件存储tag hashcdoe而不是存储tag字符串的原因?

​这里不是存储tag的原始字符串,而是存储hashcode,目的就是确保每个条目的长度固定,可以使用访问类似数组下标的方式来快速定位条目,极大的提高了ConsumeQueue文件的读取性能。

ConsumeQueue计算存储偏移量

消息消费者根据topic、消息消费进度(ConsumeQueue逻辑偏移量),即第几个ConsumeQueue条目,这样根据消费进度去访问消息的方法为使用逻辑偏移量logicOffset * 20即可找到该条目的起始偏移量(ConsumeQueue文件中的偏移量),然后读取该偏移量后20个字节即得到了一个条目,无需遍历ConsumeQueue文件。 

  • 第N个消ConsumeQueue的元素数据的索引开始:(N-1)* 20+1
  • 第N个消ConsumeQueue的元素数据的索引结束:(N)* 20 
IndexFile 文件格式(ConsumeQueue的物理实现-broker端)

提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:$HOME \store\index${fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M。

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Persistence,消息中间件通常采用的几种持久化方式,可以存到数据库里面甚至redis里,你知道不?

IndexFile文件基于物理磁盘文件实现Hash索引。其文件由40字节的文件头、500W个hash槽,每个hash槽为4个字节,最后由2000万个Index条目,每个条目由20个字节构成,分别为4字节的索引key的hashcode、8字节消息物理偏移量、4字节时间戳、4字节的前一个Index条目(hash冲突的链表结构)。  

总体运作流程机制

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Persistence,消息中间件通常采用的几种持久化方式,可以存到数据库里面甚至redis里,你知道不?

存储结构分析机制
  • 数据单独存储到一个Commit Log完全
  • 队列实际只存储消息在Commit Log的位置信息,并且串行方式刷盘。
这样做的好处如下
  • 队列轻量化,单个队列数据量非常少。
  • 对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致 IOWAIT 增高。
它的缺点如下
  • 写虽然完全是顺序写,但是读却变成了完全的随机读。
  • 读一条消息,会先读 Consume Queue,再读 Commit Log,增加了开销。
  • 要保证 Commit Log 与 Consume Queue 完全的一致,增加了编程的复杂度。
以上缺点如何克服
  • 随机读,尽可能让读命中PAGECACHE,减少 IO 读操作,所以内存越大越好。如果系统中堆积的消息过多,读数据要访问磁盘会不会由于随机读导致系统性能急剧下降,答案是否定的。
  • 访问 PAGECACHE 时,即使只访问1k 的消息,系统也会提前预读出更多数据,在下次读时,就可能命中内存。
  • 随机访问 Commit Log 磁盘数据,系统 IO 调度算法设置为 NOOP 方式,会在一定程度上将完全的随机读变成顺序跳跃方式,而顺序跳跃方式读较完全的随机读性能会高 5 倍以上,可参见以下针对各种 IO方式的性能数据。

http://stblog.baidu-tech.com/?p=851

  • 4k的消息在完全随机访问情况下,仍然可以达到8K次每秒以上的读性能。
  • 由于Consume Queue存储数据量极少,而且是顺序读,在 PAGECACHE 预读作用下,Consume Queue的读性能几乎与内存一致,即使堆积情况下。所以可认为 Consume Queue完全不会阻碍读性能。
  • Commit Log 中存储了所有的元信息,包含消息体,类似于 Mysql、Oracle 的 redolog,所以只要有 Commit Log 在,Consume Queue 即使数据丢失,仍然可以恢复出来。

RocketMQ存储目录结构

|-- abort

|-- checkpoint

|-- config

| |-- consumerOffset.json

| |-- consumerOffset.json.bak

| |-- delayOffset.json

| |-- delayOffset.json.bak

| |-- subscriptionGroup.json

| |-- subscriptionGroup.json.bak

| |-- topics.json

| `-- topics.json.bak

|-- commitlog

| |-- 00000003384434229248

| |-- 00000003385507971072

| `-- 00000003386581712896

`-- consumequeue

|-- %DLQ%ConsumerGroupA

| `-- 0

| `-- 00000000000006000000

|-- %RETRY%ConsumerGroupA

| `-- 0

| `-- 00000000000000000000

|-- %RETRY%ConsumerGroupB

| `-- 0

| `-- 00000000000000000000

|-- SCHEDULE_TOPIC_XXXX

| |-- 2

| | `-- 00000000000006000000

| |-- 3

| | `-- 00000000000006000000

|-- TopicA

| |-- 0

| | |-- 00000000002604000000

| | |-- 00000000002610000000

| | `-- 00000000002616000000

| |-- 1

| | |-- 00000000002610000000

| | `-- 00000000002616000000

|-- TopicB

| |-- 0

| | `-- 00000000000732000000

| |-- 1

| | `-- 00000000000732000000

| |-- 2

| | `-- 00000000000732000000

RocketMQ存储总体结构关系图

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Persistence,消息中间件通常采用的几种持久化方式,可以存到数据库里面甚至redis里,你知道不?

在上面的RocketMQ的消息存储整体架构图中可以看出,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。

页缓存与内存映射

页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘*问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。

RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。