当我们说顺序时,我们在说什么?
日常思维中,顺序大部分情况会和时间关联起来,即时间的先后表示事件的顺序关系。
比如事件A发生在下午3点一刻,而事件B发生在下午4点,那么我们认为事件A发生在事件B之前,他们的顺序关系为先A后B。
上面的例子之所以成立是因为他们有相同的参考系,即他们的时间是对应的同一个物理时钟的时间。如果A发生的时间是北京时间,而B依赖的时间是东京时间,那么先A后B的顺序关系还成立吗?
如果没有一个绝对的时间参考,那么A和B之间还有顺序吗,或者说怎么断定A和B的顺序?
显而易见的,如果A、B两个事件之间如果是有因果关系的,那么A一定发生在B之前(前因后果,有因才有果)。相反,在没有一个绝对的时间的参考的情况下,若A、B之间没有因果关系,那么A、B之间就没有顺序关系。
那么,我们在说顺序时,其实说的是:
-
有绝对时间参考的情况下,事件的发生时间的关系;
-
和没有时间参考下的,一种由因果关系推断出来的happening before的关系;
在分布式环境中讨论顺序
当把顺序放到分布式环境(多线程、多进程都可以认为是一个分布式的环境)中去讨论时:
-
同一线程上的事件顺序是确定的,可以认为他们有相同的时间作为参考
-
不同线程间的顺序只能通过因果关系去推断
(点表示事件,波浪线箭头表示事件间的消息)
上图中,进程P中的事件顺序为p1->p2->p3->p4(时间推断)。而因为p1给进程Q的q2发了消息,那么p1一定在q2之前(因果推断)。但是无法确定p1和q1之间的顺序关系。
推荐阅读《Time, Clocks, and the Ordering of Events in a Distributed System》,会透彻的分析分布式系统中的顺序问题。
消息中间件中的顺序消息
什么是顺序消息
有了上述的基础之后,我们回到本篇文章的主题中,聊一聊消息中间件中的顺序消息。
顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。
顺序消息包含两种类型:
分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费
全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费
这是阿里云上对顺序消息的定义,把顺序消息拆分成了顺序发布和顺序消费。那么多线程中发送消息算不算顺序发布?
如上一部分介绍的,多线程中若没有因果关系则没有顺序。那么用户在多线程中去发消息就意味着用户不关心那些在不同线程中被发送的消息的顺序。即多线程发送的消息,不同线程间的消息不是顺序发布的,同一线程的消息是顺序发布的。这是需要用户自己去保障的。
而对于顺序消费,则需要保证哪些来自同一个发送线程的消息在消费时是按照相同的顺序被处理的(为什么不说他们应该在一个线程中被消费呢?)。
全局顺序其实是分区顺序的一个特例,即使Topic只有一个分区(以下不在讨论全局顺序,因为全局顺序将面临性能的问题,而且绝大多数场景都不需要全局顺序)。
如何保证顺序
在MQ的模型中,顺序需要由3个阶段去保障:
-
消息被发送时保持顺序
-
消息被存储时保持和发送的顺序一致
-
消息被消费时保持和存储的顺序一致
发送时保持顺序意味着对于有顺序要求的消息,用户应该在同一个线程中采用同步的方式发送。存储保持和发送的顺序一致则要求在同一线程中被发送出来的消息A和B,存储时在空间上A一定在B之前。而消费保持和存储一致则要求消息A、B到达Consumer之后必须按照先A后B的顺序被处理。
如下图所示:
对于两个订单的消息的原始数据:a1、b1、b2、a2、a3、b3(绝对时间下发生的顺序):
-
在发送时,a订单的消息需要保持a1、a2、a3的顺序,b订单的消息也相同,但是a、b订单之间的消息没有顺序关系,这意味着a、b订单的消息可以在不同的线程中被发送出去
-
在存储时,需要分别保证a、b订单的消息的顺序,但是a、b订单之间的消息的顺序可以不保证
-
a1、b1、b2、a2、a3、b3是可以接受的
-
a1、a2、b1、b2、a3、b3也是可以接受的
-
a1、a3、b1、b2、a2、b3是不能接受的
-
消费时保证顺序的简单方式就是“什么都不做”,不对收到的消息的顺序进行调整,即只要一个分区的消息只由一个线程处理即可;当然,如果a、b在一个分区中,在收到消息后也可以将他们拆分到不同线程中处理,不过要权衡一下收益
开源RocketMQ中顺序的实现
上图是RocketMQ顺序消息原理的介绍,将不同订单的消息路由到不同的分区中。文档只是给出了Producer顺序的处理,Consumer消费时通过一个分区只能有一个线程消费的方式来保证消息顺序,具体实现如下。
Producer端
Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的分区,在RocketMQ中,通过MessageQueueSelector来实现分区的选择。
-
List<MessageQueue> mqs:消息要发送的Topic下所有的分区
-
Message msg:消息对象
-
额外的参数:用户可以传递自己的参数
比如如下实现就可以保证相同的订单的消息被路由到相同的分区:
long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());
Consumer端
RocketMQ消费端有两种类型:MQPullConsumer和MQPushConsumer。
MQPullConsumer由用户控制线程,主动从服务端获取消息,每次获取到的是一个MessageQueue中的消息。PullResult中的List msgFoundList自然和存储顺序一致,用户需要再拿到这批消息后自己保证消费的顺序。
对于PushConsumer,由用户注册MessageListener来消费消息,在客户端中需要保证调用MessageListener时消息的顺序性。RocketMQ中的实现如下:
-
PullMessageService单线程的从Broker获取消息
-
PullMessageService将消息添加到ProcessQueue中(ProcessMessage是一个消息的缓存),之后提交一个消费任务到ConsumeMessageOrderService
-
ConsumeMessageOrderService多线程执行,每个线程在消费消息时需要拿到MessageQueue的锁
-
拿到锁之后从ProcessQueue中获取消息
保证消费顺序的核心思想是:
-
获取到消息后添加到ProcessQueue中,单线程执行,所以ProcessQueue中的消息是顺序的
-
提交的消费任务时提交的是“对某个MQ进行一次消费”,这次消费请求是从ProcessQueue中获取消息消费,所以也是顺序的(无论哪个线程获取到锁,都是按照ProcessQueue中消息的顺序进行消费)
顺序和异常的关系
顺序消息需要Producer和Consumer都保证顺序。Producer需要保证消息被路由到正确的分区,消息需要保证每个分区的数据只有一个线程消息,那么就会有一些缺陷:
-
发送顺序消息无法利用集群的Failover特性,因为不能更换MessageQueue进行重试
-
因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大
-
消费的并行读依赖于分区数量
-
消费失败时无法跳过
不能更换MessageQueue重试就需要MessageQueue有自己的副本,通过Raft、Paxos之类的算法保证有可用的副本,或者通过其他高可用的存储设备来存储MessageQueue。
热点问题好像没有什么好的解决办法,只能通过拆分MessageQueue和优化路由方法来尽量均衡的将消息分配到不同的MessageQueue。
消费并行度理论上不会有太大问题,因为MessageQueue的数量可以调整。
消费失败的无法跳过是不可避免的,因为跳过可能导致后续的数据处理都是错误的。不过可以提供一些策略,由用户根据错误类型来决定是否跳过,并且提供重试队列之类的功能,在跳过之后用户可以在“其他”地方重新消费到这条消息。