【RocketMQ入门到精通】— RocketMQ中级特性能力 | 解释一下顺序消息原理

时间:2022-11-15 11:03:53

名言警句


任何先进的技术均与魔法无异


追本溯源

【​​经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】​​】


什么是消息的顺序性

消息的顺序性指的是在消息消费时,能按照发送的顺序来消费。例如:针对于商城服务的的下单到付款的流程中,会产生三条业务消息,它们分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。这时候就必须要考虑保证消息有序,如下图所示。

【RocketMQ入门到精通】— RocketMQ中级特性能力 | 解释一下顺序消息原理


顺序消息的分类

顺序消息分为全局顺序消息分区顺序消息全局顺序是指某个Topic下的所有消息都要保证顺序部分顺序消息只要保证每一组消息被顺序消费即可


全局顺序

对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。

  • 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景。

分区顺序

对于指定的一个 Topic,所有消息根据sharding key进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。

  • 适用场景:性能要求高,以sharding key作为分区字段,同一个区块严格的按照FIFO原则进行消息发布和消费的场景。

普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。


严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

顺序消息缺陷

  • ​发送顺序消息无法利用集群 FailOver 特性
  • 消费顺序消息的并行度依赖于队列数量
  • 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题
  • 遇到消息失败的消息,无法跳过,当前队列消费暂停​

顺序消息的实现

对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 Apache RocketMQ 中支持分区顺序消息,如下图所示。我们可以按照某一个标准对消息进行分区(比如根据ShardingKey进行分区),同一个ShardingKey的消息会被分配到同一个队列中,并按照顺序被消费。

需要注意的是 RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。

RocketMQ的生产顺序性

RocketMQ通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:

  • 单一生产者消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
  • 串行发送生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序

满足以上条件的生产者,将顺序消息发送至服务端后,会保证设置了同一分区键的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

【RocketMQ入门到精通】— RocketMQ中级特性能力 | 解释一下顺序消息原理

顺序消息的应用场景也非常广泛,在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。

例如,创建订单的场景,需要保证同一个订单的生成、付款和发货,这三个操作被顺序执行。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时将ShardingKey相同(同一订单号)的消息序路由到一个逻辑队列中。

【RocketMQ入门到精通】— RocketMQ中级特性能力 | 解释一下顺序消息原理

生产者顺序消息

顺序消息的代码手动实现如下所示:

public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
DefaultMQProducer producer = new DefaultMQProducer("test_group");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);

System.out.printf("%s%n", sendResult);
}

producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}

这里的区别主要是调用了​​SendResult send(Message msg, MessageQueueSelector selector, Object arg)​​方法,MessageQueueSelector 是队列选择器,arg 是一个 Java Object 对象,可以传入作为消息发送分区的分类标准。

MessageQueueSelector的接口如下:

public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

其中mqs是可以发送的队列,msg是消息,arg是上述send接口中传入的Object对象,返回的是该消息需要发送到的队列。上述例子里,是以orderId作为分区分类标准,对所有队列个数取余,来对将相同orderId的消息发送到同一个队列中。

生产环境中建议选择最细粒度的分区键进行拆分,例如,将订单ID、用户ID作为分区键关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。

顺序消息的一致性(系统支持)

如果一个Broker掉线,那么此时队列总数是否会发化?如果发生变化,那么同一个 ShardingKey 的消息就会发送到不同的队列上,造成乱序。如果不发生变化,那消息将会发送到掉线Broker的队列上,必然是失败的。因此 Apache RocketMQ 提供了两种模式,如果要保证严格顺序而不是可用性,创建 Topic 是要指定 ​​-o​​ 参数(--order)为true,表示顺序消息:

$ sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=true, attributes=null]

其次要保证NameServer中的配置 ​​orderMessageEnable​​ 和 ​​returnOrderTopicConfigToBroker​​ 必须是 true。如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。

消费组顺序消息

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
package org.apache.rocketmq.example.order2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}