文章目录
- 前言
- MQ如何保证消息不丢失
- RabbitMQ
- RocketMQ
- KafkaMQ
- MQ如何保证顺序消息
- RabbitMQ
- RocketMQ
- Kafka
- MQ刷盘机制/集群同步
- RabbitMQ
- RocketMQ
- Kafka
- 广播消息&集群消息
- RabbitMQ
- RocketMQ
- MQ集群架构
- RabbitMQ
- RocketMQ
- Kafka
- 消息重试
- RabbitMQ
- RockeMq
- Kafka
- 死信队列
- RocketMQ
- Kafka
- 消息去重
- RocketMQ
- Kafka
- 事务消息
- RabbitMQ
- RocketMQ
- Kafka
- 消息积压
- RabbitMQ
- 设计MQ思路
- 博客记录
前言
消息丢失的三种情况
- 消息在传入服务过程中丢失
- MQ收到消息,暂存内存中,还没消费,自己挂掉了,内存中的数据搞丢
- 消费者消费到了这个消息,但还没来得及处理,就挂了,MQ以为消息已经被处理
也就是生产者丢失消息、消息列表丢失消息、消费者丢失消息;
MQ如何保证消息不丢失
RabbitMQ
一、生产者
开启RabbitMQ事务
生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。
// 开启事务
channel.txSelect
try {
// 这里发送消息
} catch (Exception e) {
channel.txRollback
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit
设置Confirm模式:
同步确认:
//开启发布确认
channel.confirmSelect();
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息发送成功");
}
异步确认 :
略
服务端 :
消息持久化,必须满足以下三个条件,缺一不可。
-
Exchange 设置持久化
-
Queue 设置持久化
-
Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息
发送消息时设置delivery_mode属性为2
,使消息被持久化保存到磁盘,即使RabbitMQ服务器宕机也能保证消息不丢失。同时,创建队列时设置durable属性为True,以确保队列也被持久化保存。
// 声明队列,并将队列设置为持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello, RabbitMQ!";
// 发送消息时将消息设置为持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.build();
channel.basicPublish("", "myQueue", properties, "Hello, RabbitMQ".getBytes());
设置备份交换机:
Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange", "myAlternateExchange");
channel.exchangeDeclare("myExchange", BuiltinExchangeType.DIRECT, true, false, arguments);
channel.exchangeDeclare("myAlternateExchange", BuiltinExchangeType.FANOUT, true, false, null);
二 :服务端设置集群镜像模式
-
单节点模式: 最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。
-
普通模式: 消息只会存在与当前节点中,并不会同步到其他节点,当前节点宕机,有影响的业务会瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。
-
镜像模式: 消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。属于RabbitMQ的HA方案
消费方开启消息确认机制 :
// 开启消息确认机制
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
// 手动发送消息确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
手动确认 :
codechannel.basicConsume("myQueue", false, (consumerTag, delivery) -> {
// 处理消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {});
RocketMQ
一、生产者提供SYNC的发送消息方式,等待broker处理结果。
RocketMQ生产者提供了3种发送消息方式,分别是:
同步发送
:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应发送结果。
Message msg = new Message("TopicTest",
"TagA","OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//同步传递消息,消息会发给集群中的⼀个Broker节点。
SendResult sendResult = producer.send(msg);
异步发送
:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。
Message msg = new Message("TopicTest","TagA","OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
Oneway发送
:Oneway 方式只负责发送请求,不等待应答,Producer只负责把请求发出去,而不处理响应结果。
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
//核⼼:发送消息。没有返回值,发完消息就不管了,不知道有没有发送消息成功
producer.sendOneway(msg);
SendResult
定义说明(来自RocketMQ官方)
- SEND_OK
消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。 - FLUSH_DISK_TIMEOUT
消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。 - FLUSH_SLAVE_TIMEOUT
消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。 - SLAVE_NOT_AVAILABLE
消息发送成功,但是此时Slave不可用。如果Broker服务器的角色是同步Master,即- SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。
我们在调用producer.send方法时,不指定回调方法,则默认采用同步发送消息的方式,这也是丢失几率最小的一种发送方式(但是效率比较低)。
二、Borker 方面 : 设置成同步刷盘及同步复制;开启集群模式,集群同步;
1)异步刷盘
默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。
2)同步刷盘
消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置:
flushDiskType=SYNC_FLUSH
同步复制后,消息复制流程如下:
-
slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;
-
master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;
-
slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;
-
master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。
三、消费者
RocketMQ消费失败后的消费重试机制
手动提交消息偏移量
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
public enum ConsumeConcurrentlyStatus {
//业务方消费成功
CONSUME_SUCCESS,
//业务方消费失败,之后进行重新尝试消费
RECONSUME_LATER;
}
RECONSUME_LATER “%RETRY%+ConsumeGroupName”—重试队列的主题
KafkaMQ
解决方案:
1、生产者调用异步回调消息。伪代码如下: producer.send(msg,callback);
2、生产者增加消息确认机制,设置生产者参数:acks = all。partition的leader副本接收到消息,等待所有的follower副本都同步到了消息之后,才认为本次生产者发送消息成功了;
3、生产者设置重试次数。比如:retries>=3,增加重试次数以保证消息的不丢失;
4、定义本地消息日志表,定时任务扫描这个表自动补偿,做好监控告警。
5、后台提供一个补偿消息的工具,可以手工补偿。
生产者设置同步发送 :
// 异步发送 默认
kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
// 同步发送
RecordMetadata first = kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i)).get();
生产者设置发送ack :
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));
生产者设置重试次数。比如:retries>=3,增加重试次数以保证消息的不丢失;
三、消费者:
通过在Consumer端设置“enable.auto.commit”属性为false后,
在代码中手动调用KafkaConsumer实例的commitSync()
方法提交,
这里指的是同步阻塞commit消费的偏移量,等待Broker端的返回响应,需要注意Broker端在对commit请求做出响应之前,消费端会处于阻塞状态,从而限制消息的处理性能和整体吞吐量以确保消息能够正常被消费。
如果在消费过程中,消费端突然Crash,这时候消费偏移量没有commit,等正常恢复后依然还会处理刚刚未commit的消息。
生产者acks参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。
1)ack=0,生产者在成功写入悄息之前不会等待任何来自服务器的响应。
2)ack=1,只要集群的首领副本收到消息,生产者就会收到一个来自服务器的成功响应。
3)ack=all,只有当所有同步副本全部收到消息时,生产者才会收到一个来自 服务器的成功响应。
MQ如何保证顺序消息
严格顺序消费的注意事项
- 生产者不能异步发送,异步发送在发送失败的情况下,就没办法保证消息顺序。
比如你连续发了1,2,3。 过了一会,返回结果1失败,2, 3成功。你把1再重新发送1遍,这个时候顺序就乱掉了。
- 应用中应确保业务中添加事务锁,防止并发处理同一对象。
比如修改业务员的手机号,操作员A和操作员B同时修改业务员张三的手机号,如两人的填入手机号相同无影响,如不同,操作员A输入正确,操作员B输入错误,可能造成消费顺序乱掉,手机号修改错误。
- 对于消费端,不能并行消费,生产者顺序发送,消费端必须顺序消费。
RabbitMQ
消息单个消费者单线程消费。
RocketMQ
RocketMQ 提供了两种顺序消息模式:全局顺序消息和分区顺序消息。
- 全局顺序消息:适用于性能要求不高的场景,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。全局顺序消息实际上是一种特殊的分区顺序消息,即Topic中只有一个分区,因此全局顺序和分区顺序的实现原理相同。由于分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。
- 分区顺序消息:适用于性能要求高的场景,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。分区顺序消息比全局顺序消息的并发度和性能更高。
发送方使用MessageQueueSelector
选择队列 :
Message msg = new Message("OrderTopicTest", "order_"+orderId,
"KEY" + orderId,("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
//消息队列的选择器
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
//第一个参数:所有的消息,第二个参数:发送的消息,第三个参数:根据什么发送,这里面传的是orderId
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
//同一个订单id可以放到同一个队列里面去
}, orderId);
消费方使用MessageQueueSelector
选择队列 :
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
//自动提交
context.setAutoCommit(true);
for(MessageExt msg:msgs){
System.out.println("收到消息内容 "+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
Kafka
Kafka是分布式多partition的,它会将一个topic中的消息尽可能均匀的分发到每个partition上。那么问题就来了,这样怎么保证同一个topic消息的顺序呢?
kafka可以通过partitionKey,将某类消息写入同一个partition,一个partition只能对应一个消费线程,以保证数据有序。
也就是说生产者在写消息的时候,可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。
先后两条消息发送时,前一条消息发送失败,后一条消息发送成功,然后失败的消息重试后发送成功,造成乱序。为了解决重试机制引起的消息乱序为实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。
对于每个PID,该Producer发送消息的每个<Topic, Partition>都对应一个单调递增的Sequence Number。
同样,Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每Commit一条消息时将其对应序号递增。
对于接收的每条消息,如果其序号比Broker维护的序号大一,则Broker会接受它,否则将其丢弃
如果消息序号比Broker维护的序号差值比一大,说明中间有数据尚未写入,即乱序,此时Broker拒绝该消息
如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息
发送失败后会重试,这样可以保证每个消息都被发送到broker
消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。
指定发送partition
的分区:
//没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,
//kafkaProducer.send(new ProducerRecord<>("first","a","atguigu " + i), new Callback() {}
kafkaProducer.send(new ProducerRecord("first", 0, "", "atguigu" + i)
, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition()
);
} else {
e.printStackTrace();
}
}
});
MQ刷盘机制/集群同步
RabbitMQ
RocketMQ
同步刷盘、异步刷盘
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。
RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式:
1)异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,立刻返回发送端发送成功,有单独的线程执行刷盘;写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入。
2)同步刷盘方式:在返回写成功状态时,消息已经被写入磁盘。同步调用MappedByteBuffer的force()方法,同步等待刷盘结果,进行刷盘结果返回告知发送端。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
同步刷盘还是异步刷盘,是通过Broker配置文件里的flushDiskType参数设置的,这个参数被设置成SYNC_FLUSH、ASYNC_FLUSH中的一个
消息存储时,先将消息存储到内存,再根据不同的刷盘策略进行刷盘
同步刷盘:
异步刷盘:
刷盘源码
同步复制、异步复制
如果一个broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。
- 同步复制是等Master和Slave均写成功后才反馈给客户端写成功状态;
- 异步复制方式是只要Master写成功即可反馈给客户端写成功状态
这两种复制方式各有优劣:
- 在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失;
- 在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
同步复制和异步复制是通过Broker配置文件里的brokerRole
参数进行设置的,这个参数可以被设置成ASYNC_MASTER、SYNC_MASTER、SLAVE
三个值中的一个。
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式,尤其是SYNC_FLUSH方式,由于频繁的触发写磁盘动作,会明显降低性能。
通常情况下,应该把Master和Slave设置成ASYNC_FLUSH的刷盘方式,
主从之间配置成SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然可以保证数据不丢。
Kafka
Broker针对每个分区会创建一个分区目录,分区目录下面存放的是日志文件(.log)和索引文件(.index)
Kafka的刷盘策略主要有两种:同步刷盘(sync flush)和异步刷盘(async flush)。
同步异步刷盘的区别在于,消息存储在内存(memory)中以后,是否会等待执行完刷盘动作再返回,即是否会等待将消息中的消息写入磁盘中。kafka可以通过配置flush.message
和flush.ms
来设置刷盘策略,如果flush.message设置为5,表示每5条消息进行一次刷盘,如果flush.message设置为1,表示每一条消息都进行一次刷盘。如果flush.ms设置为1000,表示每过1000ms进行一次刷盘,如果flush.ms设置为5000,表示每过5000ms进行一次刷盘。
- 同步刷盘:每条消息被写入磁盘前,必须等待操作系统完成该消息的磁盘写入操作。这种方式可以确保数据不丢失,但由于每次消息都要等待磁盘I/O完成,因此会影响性能。在Kafka中,默认使用的是异步刷盘策略,因为它结合了多副本和基于日志的存储机制,通过复制和重放来保障数据的高可用性。异步刷盘的目的是为了提高吞吐量和适应高性能应用场景。不过,这种方法增加了数据丢失的风险,尤其是在系统发生故障的情况下。
- 异步刷盘:这是一种更轻量级的刷盘方式,它允许消息先被写入内存中的页缓存,然后在空闲时由操作系统异步地刷入磁盘。这样可以减少对性能的影响,尤其是当处理大量消息时。然而,由于没有立即将数据刷入磁盘,所以存在一定的数据丢失风险。Kafka提供了配置项
log.flush.interval.messages
和log.flush.interval.ms
来控制何时触发强制的刷盘操作。如果没有设置这些参数,那么Kafka会根据log.flush.scheduler.interval.ms
(默认值为3000毫秒)的时间间隔进行检查,以确定是否需要刷新所有日志到磁盘。需要注意的是,尽管可以通过这些参数来实现一定程度的控制,但是官方并不推荐依赖它们来强制刷盘,而是强调通过副本机制来保证数据的一致性和可靠性。
广播消息&集群消息
RabbitMQ
RocketMQ
RocketMQ主要提供了两种消费模式:集群消费以及广播消费。我们只需要在定义消费者的时候通过setMessageModel(MessageModel.XXX)方法就可以指定是集群还是广播式消费,默认是集群消费模式,即每个Consumer Group中的Consumer均摊所有的消息。
public class MQProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// 创建DefaultMQProducer类并设定生产者名称
DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");
// 设置NameServer地址,如果是集群的话,使用分号;分隔开
mqProducer.setNamesrvAddr("10.0.91.71:9876");
// 消息最大长度 默认4M
mqProducer.setMaxMessageSize(4096);
// 发送消息超时时间,默认3000
mqProducer.setSendMsgTimeout(3000);
// 发送消息失败重试次数,默认2
mqProducer.setRetryTimesWhenSendAsyncFailed(2);
// 启动消息生产者
mqProducer.start();
// 循环十次,发送十条消息
for (int i = 1; i <= 10; i++) {
String msg = "hello, 这是第" + i + "条同步消息";
// 创建消息,并指定Topic(主题),Tag(标签)和消息内容
Message message = new Message("CLUSTERING_TOPIC", "", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送同步消息到一个Broker,可以通过sendResult返回消息是否成功送达
SendResult sendResult = mqProducer.send(message);
System.out.println(sendResult);
}
// 如果不再发送消息,关闭Producer实例
mqProducer.shutdown();
}
}
public class MQConsumerB