消息队列概述

时间:2022-12-07 19:00:45


1. 两个概念

1.1 消息代理(message broker)

消息队列概述

1.2 目的地 (destination)

1.2.1 队列 (queue):点对点消息通信(point-to-point)

消息只有唯一的发送者发送和接收者接收,但不是说只能有一个接收者监听

1.2.2 主题 (topic): 发布(publish)/订阅(subscribe) 消息通信

发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息。

2.规范

2.1 JMS (java message sservice) java消息服务

java api

两种消息模型model:

peer-2-peer,pub/sub

多种消息类型:TextMessage,MapMessage,BytesMessage,StreamMessage,ObjectMessage,Message

eg: ActiveMQ

2.2 AMQP (advanced message queuing protocal) 高级消息队列协议

网络线级协议

五种消息模型model:

direct exchange,fanout exchange,topic change,headers exchange,system exchange

一种消息类型:byte[]

eg: RabbitMQ

3.Spring支持

@JmsListener @RabbitListener

@EnableJms @EnableRabbit

@JmsAutoConfiguration  @RabbitAutoConfiguration

4.消息队列作用

1. 异步处理

2. 应用解耦

3. ​​流量控制​​

5.消息队列的使用

5.1 异步

消息队列概述

5.2 并行

消息队列概述

5.3 排队

消息队列概述

6.消息队列产品对比

消息队列概述

7. 如何保证消息可靠性

消息队列概述

消息队列概述

 

消息队列概述

8. MQ 有哪些常见问题?如何解决这些问题?

(1)消息的顺序问题

消息有序指的是可以按照消息的发送顺序来消费。

假如生产者产生了 2 条消息:M1、M2,假定 M1 发送到 S1,M2 发送到 S2,如果要保证 M1 先于 M2 被消费,怎么做?

消息队列概述

解决方案:

1)保证生产者 - MQServer - 消费者是一对一对一的关系

缺陷:并行度就会成为消息系统的瓶颈(吞吐量不够)更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我 们不得不花费更多的精力来解决阻塞的问题。

2)通过合理的设计或者将问题分解来规避。不关注乱序的应用实际大量存在队列无序并不意味着消息无序 所以从业务层面来保证消息的顺序而不仅仅是依 赖于消息系统,是一种更合理的方式。

消息队列概述

(2)消息的重复问题

造成消息重复的根本原因是:网络不可达。
所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收 到两条一样的消息,应该怎样处理?
消费端处理消息的业务逻辑保持幂等性。只要保持幂等性,不管来多少条重复消 息,最后处理的结果都一样。保证每条消息都有唯一编号且保证消息处理成功与 去重表的日志同时出现。利用一张日志表来记录已经处理成功的消息的 ID,如 果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。

9. RabbitMq

10. 面试题

10.1

消息队列概述

消息队列概述

消息队列概述


10.2

消息队列概述

10.3

消息队列概述

消息队列概述

10.4

消息队列概述

消息队列概述

消息队列概述

10.5 如何保证RabbitMQ消息的顺序性?

拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确 实是麻烦点;

或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

10.6 消息如何分发?
 

若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能。

10.7 消息怎么路由?

消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);常用的交换器主要分为一下三种:

fanout:如果交换器收到消息,将会广播到所有绑定的队列上

direct:如果路由键完全匹配,消息就被投递到相应的队列
topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符

10.8 消息基于什么传输?

由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ 使用信道的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制。

10.9 如何保证消息不被重复消费?或者说,如何保证消息消费时的幂等性?
 

先说为什么会重复消费:正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。

针对以上问题,一个解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;比如:在写入消息队列的数据做唯一标示,消费消息时,根据唯一标识判断是否消费过;假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

消息队列概述

10.10 如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?

消息队列概述

10.11 如何保证RabbitMQ消息的可靠传输?如何保证消息的可靠传输?如果消息丢了怎么办

数据的丢失问题,可能出现在生产者、MQ、消费者中

生产者丢失:生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题

啥的,都有可能。此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启

RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么

生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到

了消息,那么可以提交事务channel.txCommit。吞吐量会下来,因为太耗性能。所以一般来说,如

果你要确保说写 RabbitMQ 的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模

式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给

你回传一个ack消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是

confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息RabbitMQ 接收

了之后会异步回调你一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都

是用confirm机制的。

MQ中丢失:就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写

入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般

数据不会丢。设置持久化有两个步骤:创建 queue 的时候将其设置为持久化,这样就可以保证

RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。第二个是发送消息的时候

将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久

化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁

盘上重启恢复 queue,恢复这个 queue 里的数据。持久化可以跟生产者那边的confirm机制配合起

来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,

RabbitMQ 挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。注意,哪怕是你给

RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得

及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。

消费端丢失:你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,

RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的ack机制,简单来

说,就是你关闭 RabbitMQ 的自动ack,可以通过一个 api 来调用就行,然后每次你自己代码里确

保处理完的时候,再在程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处

理,消息是不会丢的

消息队列概述

10.12 为什么不应该对所有的 message 都使用持久化机制?

消息队列概述

10.13 如何保证高可用的?RabbitMQ 的集群

消息队列概述

10.14 如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

消息积压处理办法:

临时紧急扩容:
先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的10 倍速度来消费数据。等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。

MQ中消息失效:

假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12 点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面
去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。

mq消息队列块满了:

如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧

10.15 消息中间件如何保证消息的一致性

消息队列概述

消息队列概述

10.16 设计一个消息队列,你会怎么设计

比如说这个消息队列系统,我们从以下几个角度来考虑一下:
首先这个 mq 得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么
搞?设计个分布式的系统呗,参照一下 kafka 的设计理念,broker -> topic -> partition,每个
partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,
然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?
其次你得考虑一下这个 mq 的数据要不要落地磁盘吧?那肯定要了,落磁盘才能保证别进程挂了数
据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读
写的性能是很高的,这就是 kafka 的思路。
其次你考虑一下你的 mq 的可用性啊?这个事儿,具体参考之前可用性那个环节讲解的 kafka 的高
可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。
能不能支持数据 0 丢失啊?可以的,参考我们之前说的那个 kafka 数据零丢失方案

10.17 mq 的缺点

消息队列概述

10.18 dead letter” queue 的用途

当消息被 RabbitMQ server 投递到 consumer 后,但 consumer 却通过 Basic.Reject 进行了拒绝时(同时设置 requeue=false),那么该消息会被放入“dead letter” queue 中。该queue 可用于排查 message 被 reject 或 undeliver 的原因。