RocketMQ如何保证消息的可靠性?

时间:2021-09-23 01:43:01

RocketMQ如何保证消息的可靠性?

消息的发送方式有哪几种?存储消息的可靠性面临哪些挑战?消费消息的确认机制是怎样的?本文通过分析消息流转的整个过程,从消息发送、消息存储和消息消费三个阶段介绍RocketMQ是如何保证消息的可靠性的。

分布式系统中一个重要的前提假设是所有的网络传输都是不可靠的,在网络传输不可靠的情况下,保证消息的可靠传输,除了进行重试投递别无他法。常用的绝大多数消息队列RocketMQ、RabbitMQ等在消息传输上都只能保证至少传输成功一次,也即(At least once),而不能保证只传输成功一次(Exactly once)。由于分布式系统网络的不可靠,可能就会出现消息丢失的现象,那么RocketMQ是如何最大限度的保证消息不丢失的呢?那就需要从消息的产生到最终消费的整个过程来分析,消息完整链路可以划分为以下三个阶段:

  • 生产阶段:消息在 Producer 发送端创建出来,经过网络传输发送到 Broker 存储端。
  • 存储阶段:消息在 Broker 端存储,如果是主备或者多副本,消息会在这个阶段被复制到其他的节点或者副本上。
  • 消费阶段:Consumer 消费端从 Broker存储端拉取消息,经过网络传输发送到 Consumer 消费端上,并通过重试来最大限度的保证消息的消费。

一 发送端消息可靠性

发送端Producer发送消息Broker端的核心逻辑如下图所示:

RocketMQ如何保证消息的可靠性?

消息发送一般有以下几种方式:同步发送、异步发送以及单向发送,业务具体选择哪种方式进行消息发送,需要根据情况进行判断,下面具体介绍不同的发送方式实现的消息可靠性保证。

1 同步发送

同步发送是指发送端在发送消息时,阻塞线程进行等待,直到服务器返回发送的结果。发送端如果需要保证消息的可靠性,防止消息发送失败,可以采用同步阻塞式的发送,然后同步检查Brocker返回的状态来判断消息是否持久化成功。如果发送超时或者失败,则会默认重试2次,RocketMQ选择至少传输成功一次的消息模型,但是有可能发生重复投递,因为网络传输是不可靠的,具体的重试策略可以参照第四小节。

2 异步发送

异步发送是指发送端在发送消息时,传入回调接口实现类,调用该发送接口后不会阻塞,发送方法会立即返回,回调任务会在另一个线程中执行,消息发送结果会回传给相应的回调函数。具体的业务实现可以根据发送的结果信息来判断是否需要重试来保证消息的可靠性。

3 单向发送

单向发送是指发送端发送完成之后,调用该发送接口后立刻返回,并不返回发送的结果,业务方无法根据发送的状态来判断消息是否发送成功,单向发送相对前两种发送方式来说是一种不可靠的消息发送方式,因此要保证消息发送的可靠性,不推荐采用这种方式来发送消息。

4 发送重试策略

RocketMQ架构模型中会有多个Borker为某个topic提供服务,一个topic下的消息分散存储在多个Broker存储端,它们是多对多关系。Broker会将其提供存储服务的topic的元数据信息上报到NameServer,对等NameServer节点组成的高可用服务会维护topic与Broker之间的映射关系,多对多的映射关系为消息可以重试发送到多个Broker端提供了前提与基础。

当发送端需要发送消息时,如果发送端中缓存了topic的路由信息,并包含了消息队列,则直接返回该路由信息,如果没有缓存或没有消息队列,则向NameServer查询该topic的路由信息,查询到路由消息之后,采用指定的队列选择策略选择相应的queue发送消息,默认是采用轮询策略,发送成功则返回, 收到异常则根据相应的策略进行重试,可以根据发送端感知到的Broker的时延、上次发送失败的Broker信息和发送端配置的是否重试不同Broker的参数以及发送端设置的最大超时时间等等策略来灵活地实现不同等级的消息发送可靠性保证。重试策略可以有效的保证消息发送成功的概率,最终提高消息发送的可靠性。

二 存储端消息可靠性

RocketMQ的消息存储结构如下图所示:

RocketMQ如何保证消息的可靠性?

  • 消息队列存储的最小单位是消息Message。
  • 同一个Topic下的消息映射成多个逻辑队列。
  • 不同Topic的消息按照到达broker的先后顺序以Append的方式添加至CommitLog,顺序写,随机读。

目前RocketMQ存储模型使用本地磁盘进行存储,数据写入为producer -> direct memory -> pagecache -> 磁盘,数据读取如果pagecache有数据则直接从pagecache读,否则需要先从磁盘加载到pagecache中。Broker存储节点的文件存储模式如下图所示:

RocketMQ如何保证消息的可靠性?

Broker端CommitLog采用顺序写,可以大大提高写入效率,同时采用不同的刷盘模式提供不同的数据可靠性保证,此外采用了ConsumeQueue中间结构来存储偏移量信息,实现消息的分发。由于ConsumeQueue结构固定且大小有限,在实际情况中,大部分的ConsumeQueue 能够被全部读入内存,可以达到内存读取的速度。此外为了保证CommitLog和ConsumeQueue的一致性, CommitLog里存储了Consume Queues 、Message Key、Tag等所有信息,即使ConsumeQueue丢失,也可以通过 commitLog完全恢复出来,这样只要保证commitLog数据的可靠性,就可以保证Consume Queue的可靠性。

RocketMQ存储端采用本地磁盘进行CommitLog消息数据的存储,不可避免的就会带来存储可靠性的挑战,如何保证消息不丢失,RocketMQ消息服务一直在不断提高数据的可靠性。

1 存储可靠性挑战

RocketMQ存储端也即Broker端在存储消息的时候会面临以下的存储可靠性挑战:

  1. Broker正常关闭
  2. Broker异常Crash
  3. OS Crash
  4. 机器掉电,但是能立即恢复供电情况
  5. 机器无法开机(可能是cpu、主板、内存等关键设备损坏)
  6. 磁盘设备损坏

1正常关闭,Broker 可以正常启动并恢复所有数据。2、3、4同步刷盘可以保证数据不丢失,异步刷盘可能导致少量数据丢失。5、6属于单点故障,且无法恢复。解决单点故障可以采用增加Slave节点,主从异步复制仍然可能有极少量数据丢失,同步复制可以完全避免单点问题。

这里一般来说就需要在性能和可靠性之间做出取舍,对于RocketMQ来说,Broker的可靠性主要由两个方面保障:

  • 单机的刷盘机制
  • 主从之间的数据复制

如果设置为每条消息都强制刷盘、主从复制,那么性能无疑会降低;如果不这样设置,就会有一定的可能性丢失消息。RocketMQ一般都是先把消息写到PageCache中,然后再持久化到磁盘上,数据从pagecache刷新到磁盘有两种方式,同步和异步。整体的消息写入和读取如下图所示:

RocketMQ如何保证消息的可靠性?

针对broker端单机存储可靠性,主要依赖单机的刷盘策略,主从之间的副本复制可以参考下一章节的主从模式。

2 同步刷盘

消息写入内存的 PageCache后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式可以保证数据绝对安全,但是吞吐量不大。

3 异步刷盘(默认)

消息写入到内存的 PageCache中,就立刻给客户端返回写操作成功,当 PageCache中的消息积累到一定的量时,触发一次写操作,或者定时等策略将 PageCache中的消息写入到磁盘中。这种方式吞吐量大,性能高,但是 PageCache中的数据可能丢失,不能保证数据绝对的安全。

实际应用中要结合业务场景,合理设置刷盘方式,尤其是同步刷盘的方式,由于频繁的触发磁盘写动作,会明显降低性能。

4 过期文件删除

由于RocketMQ操作CommitLog、ConsumeQueue文件是基于文件内存映射机制,并且在启动的时候会将所有的文件加载,为了避免内存与磁盘的浪费、能够让磁盘能够循环利用、避免因为磁盘不足导致消息无法写入等引入了文件过期删除机制。最终使得磁盘水位保持在一定水平,最终保证新写入消息的可靠存储。

三 消费端消息可靠性

RockerMQ默认提供了至少消费一次的消费语义来保证消息的可靠消费。

通常消费消息的确认机制一般分为两种思路:

  1. 先提交后消费
  2. 先消费,消费成功后再提交

思路1可以解决重复消费的问题但是会丢失消息,因此RocketMQ默认实现的是思路2,由各自consumer业务方保证幂等来解决重复消费问题。

消费端Consumer消费消息核心逻辑如下图所示:

RocketMQ如何保证消息的可靠性?

1 消费重试

消费者从RocketMQ拉取到消息之后,需要返回消费成功来表示业务方正常消费完成。因此只有返回CONSUME_SUCCESS才算消费完成,如果返回CONSUME_LATER则会按照不同的messageDelayLevel时间进行再次消费,时间分级从秒到小时,最长时间为2个小时后再次进行消费重试,如果消费满16次之后还是未能消费成功,则不再重试,会将消息发送到死信队列,从而保证消息存储的可靠性。

2 死信队列

未能成功消费的消息,消息队列并不会立刻将消息丢弃,而是将消息发送到死信队列,其名称是在原队列名称前加%DLQ%,如果消息最终进入了死信队列,则可以通过RocketMQ提供的相关接口从死信队列获取到相应的消息,保证了消息消费的可靠性。

3 消息回溯

回溯消费是指Consumer已经消费成功的消息,或者之前消费业务逻辑有问题,现在需要重新消费。要支持此功能,则Broker存储端在向Consumer消费端投递成功消息后,消息仍然需要保留。重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据。RocketMQ Broker提供了一种机制,可以按照时间维度来回退消费进度,这样就可以保证只要发送成功的消息,只要消息没有过期,消息始终是可以消费到的。

四 总结

本文从消息流转的整个过程分析了RocketMQ如何保证消息的可靠性,消息发送通过不同的重试策略保证了消息的可靠发送,消息存储通过不同的刷盘机制以及多副本来保证消息的可靠存储,消息消费通过至少消费成功一次以及消费重试机制来保证消息的可靠消费,RocketMQ在保证消息的可靠性上做到了全链路闭环,最大限度的保证了消息不丢失。

原文地址:https://zhuanlan.51cto.com/art/202102/644179.htm