蹊源的Java笔记—消息队列
前言
在Java
的分布式应用中有一个提升性能的利器——消息队列,通过消息队列我们可以让很多操作“异步”地去执行,这种异步的操作可以帮助应用均匀地去处理大量请求涌入的情况,从而降低系统的访问压力,本篇博客将带领大家去了解我们经常使用的两种消息队列RabbitMQ
和Kafka
的原理。
Redis服务器可参考我的博客:蹊源的Java笔记—Redis服务器
分布式可参考我的博客:蹊源的Java笔记—分布式
正文
消息队列之RabbitMQ
RabbitMQ是基于下面两种基础来实现:
-
Erlang语言:面向电信行业的函数式编程语言,它为
RabbitMQ
提供了节点之间消息通信轻量级线程,提供了状态无关的高并发性。 -
AMQP协议规范:它让
RabbitMQ
成为一个与供应商无关、平*立的解决方案,使其可以实现灵活的消息路由、配置化的消息持久以及跨数据中心通信。
RabbitMQ的功能
- 应用解耦:应用架构不再受限于数据库写入的性能瓶颈,应用只需要发送消息,不需要长时间占用线程等待响应;
-
数据库解耦:将直接存在数据库的数据发送到
RabbitMQ
,从而实现数据的异步处理。同时也可以通过消费者应用进行限流或者直接关闭,避免数据库崩溃。 -
数据同步:通过
RabbitMQ
可以将同一个数据存储到不同的数据库中,同样也可以把不同数据库的数据应用到同一个系统。 - 流量削峰:可以借助消息队列异步特性来降低系统的访问压力。
- 日志处理 :借助消息队列来异步处理日志文件
RabbitMQ与AMQP协议
AMQP
把客户端和代理服务器之间的通信数据拆分成一种叫做帧的块状结构。
低层AMQP帧的组成:
- 帧头:帧类型、信道编号、以字节为单位的帧大小
- 帧的有效载荷
- 结束字节标记
帧的类型
-
协议头帧:用于连接到
RabbitMQ
,仅使用一次。 -
心跳帧:客户端与
RabbitMQ
之间进行传递,作为一种校验机制确保连接的两端可以正常工作。 -
方法帧:携带发给
RabbitMQ
或从RabbitMQ
接收到的RPC
请求或响应。(告诉RabbitMQ
如何路由) - 内容头帧:包含一条消息的大小和属性。
- 消息体帧:包含消息的内容。
帧与消息
- 我们通常使用方法帧、内容头帧和消息帧向
RabbitMQ
发布消息。 - 一个消息通常以:方法帧、内容头帧以及一个或者多个消息体帧。(每个帧都有一定的大小限制,超过限制就会被拆分成多个)
- 传输过程中,方法帧和内容头帧会被打包成二进制,消息体帧不会进行任何打包或编码,它可以是任何数据类型。
Rabbit的抽象组件
-
交换机(Exchange):接收发送到
RabbitMQ
中的消息并决定把他们投递到那个队列的组件。 - 绑定(Binding):一套规则,用于告诉交换器消息应该被存储到哪个队列。
- 队列(Queue):用来存储消息的数据结构,位于硬盘或内存中。
- 信道(Channel):多路复用连接中的一条独立的双向数据流通道,信道会占用大量资源,要合理使用。
交换机的类别
- Direct交换机:是完全匹配、单播的模式。
- Fanout交换机:类似子网广播,所有与该交互机绑定的队列都会接受到一份消息的副本,这种方式是最快的。
- Topic交换机:通过 正则表达式的方式 实现一对多的绑定。
-
Headers交换机:它通过采用消息属性中的
headers
表结合正则表达式的方式 实现一对多的绑定(几乎不使用)
消息队列的匹配规则
1.消息队列模式
消息队列模式:发送者,接受者
2.主题消息模式
主题消息:发布者,订阅者
消息队列如何确保其消息的顺序性
通常来说有以下的思路:
- 单线程消费来确保消息的顺序性。
- 对消息进行编号,消费者处理时根据编号判断顺序。
RabbitMQ确保消息有序性
拆分多个queue
,每个queue
对应一个consumer
,然后这个consumer
内部用内存队列做排队,然后分发给底部不同worker
处理:
- 防止使用同一个队列,导致数据123进入不同的消费者,从而使得数据123没有按指定的顺序被执行
- 通过拆分
queue
来保证每一个消费者都能获得完整的数据123,然后消费者内部进行排队,从而保证消息的有序性。 - 这里同时也要设计,保证消息的幂等性。
消息队列如何保证其不会重复消费
简单来说,如何实现消息的幂等性,即消息执行一次和执行多次的结果是一样的。
保证数据不会重复消费,要结合业务来实现,比如:
- 基于数据库的主键索引的来实现
- 基于
Redis
来实现,使用set
操作具有天然的幂等性 - 通过先查一次数据,来判断是新增操作还是更新操作
- 通过向数据库前置一个布隆过滤器来判断数据是新数据还是旧数据,再使用主键索引来实现
消息队列如何保证消息不会丢失
消息从生产到消费可以经历三个阶段:
- 生产阶段:在这个阶段,从消息在生产者创建出来,经过网络传输到消息队列服务器中。
- 存储阶段:消息在消息队列服务器中存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
- 消费阶段:消费者从消息队列服务器中拉取消息,通过网络传输发送到消费者。
各阶段保证数据的不丢失的方式:
- 生产阶段:失败回调机制、发布者确认、消息持久化
- 存储阶段:备用交换器、死信交换器、事务、高可用队列、基于事务的高可用队列、消息持久化
- 消费阶段:消费者确认、消息持久化
失败回调机制
将 mandatory
设置为true
,如果消息不可路由那么rabbitmq
会把完整的消息退回到发布者中
发布者确认
发布者确认,即Confirm
机制具体的实现方式:
-
spring.rabbitmq.template.mandatory = true
设置成true
-
spring.rabbitmq.publisher-confirms = true
设置成true
- 编写一个
java
类,实现 RabbitTemplate.ConfirmCallback
接口,在这个里面我们可以确认消息是否到达了RabbitMQ
服务器。
这里RabbitMq
提供一个回调函数可以将投递失败的消息给输出出来:
消费者确认机制
消费者确认机制,即应答模式:
- 消费者完成消费处理后,会发送一个消费应答,告诉消息队列服务器这个消息已经处理完成可以删除这个消息了
- 如果一个消费者由于宕机,没有发送消息应答,那么消息队列服务器会认为消息发送失败,自动进行补偿行为,即将这个消息重新加入队列,重新投递。
应答模式可以分为:
- 自动应答:不在乎消费者对消息处理是否成功,都会告诉队列删除消息。如果处理消息失败,实现自动补偿(队列投递过去重新处理)。
- 手动应答:消费者处理完业务逻辑,手动返回ack(通知)告诉队列处理完了,队列进而删除消息。
应答模式牺牲了消息队列的性能,从而提高了消息的可靠性。
备用交换器
备用交换器用于处理无法路由的消息。备用交换器在第一次声明交换器时被指定,用来提供一种预先存在的交换器,即如果交换器无法路由消息,那么消息就会被路由到这个新的备用交换器。
死信交换器
过期的消息、basic.nack
或basic.reject
且requeue
参数为false
或队列满的消息将进入此交换器
事务
采用AMQP
事务机制和发布者确认的机制,来解决服务器数据丢失的问题。
这种机制极大牺牲了性能从而换取消息的可靠性。
高可用队列
高可用队列又称为HA
队列,需要RabbitMQ
集群环境,可以通过使用AMQP
或者使用基于web
的管理界面来设置。
RabbitMQ
集群搭建结合Erlang
来实现其内部通信,借助Haproxy
实现请求的负载均衡。
基于事务的高可用队列
在一个集群的环境下,采用的是事务或投递确认机制,则消息在被HA
队列定义的所有活动节点确认之后,RabbitMQ
才会发送成功的响应。这种方式会造成很大的延迟。
消息持久化
消息持久化是解决消息被投递到RabbitMQ
的内存中,还没有投递到消费者实例之前就宕机了,而导致消息丢失的问题。
交换机的持久化:
-
new DirectExchange("log.user.exchange", true, false);
- 第二个参数
durable
: 是否持久化, - 第三个参数
autoDelete
: 当所有绑定队列都不再使用时, 是否自动删除交换器, true
: 删除, false
: 不删除。
queue的持久化:
-
new Queue("log.user.queue.name", true);
- 声明队列时指定持久化参数为
true
即可
message的持久化:
- 将
delivery-mode
设置为MessageDeliveryMode.PERSISTENT
,即可以实现message
的持久化。 - 在默认的情况下
message
都是持久化的。
知识点:
a.关于RabbitMQ
的配置要根据实际的业务需要,在可靠性和性能之间进行抉择:
- 使用
mandatory
设置,RabbitMQ
将不接受不可路由消息 - 发布者确认作为事务的轻量级替代方法
- 使用备用交换器处理无法路由的消息
- 基于事务的批量处理
- 使用
HA
队列避免节点故障 - 使用回推机制,拒绝接受发布者发布的过多消息
消费消息性能控制
RabbitMQ
实现了两个不同的AMQP RPC
命令来获取队列中的消息:
- Basic.Get:是一个轮询模型(这种模式的性能通常比后低二倍以上)
- Basic.Cunsume:是一个推送模型(即 发布-订阅模式)
所以使用推送模型,能很大程度提高RabbitMQ
的消费能力。
a.提升消费者消费消息的能力
RabbitMQ
提供一下的方式来提升消费消息的能力(性能依次降低)
1.基于no-ack模式进行消费
ack
模式即为应答模式,即消费者确认,我们可以通过关闭ack模式(默认是打开的)的方式实现更快的吞吐量。
- 但是我们要知道
RabbitMQ
将数据投递到消费者的过程中会进过系统自带的数据缓冲区。 - 由于缺少消费者确认,当系统出现网络波动会导致当前操作系统的套接字接收缓冲区爆满,从而影响程序的正常运行。
在linux
系统可以通过增加net_core.rmem_default
和net.core.rmem_max
值,通常设置为16M即可。
2.基于确认和Qos>1进行消费
Qos
服务质量设置,即在确认消息接收之前,消费者可以预先要求接收一定数量的消息:
-
QoS
设置允许RabbitMQ
通过为消费者预先分配一定数量的消息来实现更高效地消息发送 -
Qos
默认是1 ,默认每条消息都会确认,这种方式不可以与no-ack
同时设置。 - 它本身可以视为
ack
的一种优化,它不需要对每条消息进行确认,可以通过设置multiple
为true
,对所有以前没有进行确认的消息进行确认。
3.使用事务来批量进行
事务可能会对消息吞吐量产生负面影响,但有一个例外。如果你不适用QoS
设置,那么在使用事务来批量确认消息时,实际上可能会看到略微的性能提升。并且事务不适用于已禁止确认的消费者。
b.拒绝消息
当消息本身或消息处理的过程中出现问题,RabbitMQ提供了两种将消息踢回代理服务器的机制:
- Basic.Reject: 一次只允许拒绝一个消息
- Basic.Nack:一次可以拒绝多个消息
死信交换器:
- 过期的消息、
basic.nack
或basic.reject
且requeue
参数为false
或队列满的消息将进入此交换器。 -
RabbitMQ
通过死信交换器将消息路由到绑定的队列,就像正常发送给交换器的任何其他消息一样。 - 死信功能还允许你使用预先指定的值覆盖路由键(死信交换器即可以绑定一个非死信的队列和一个死信的队列)。这样可以允许你使用一个交换器同时处理死信消息和非死信消息。
- 但需要确保死信消息不被投递到非死信队列中,需要在声明队列时指定一个额外的参数
x-dead-letter-routing-key
c.消息队列如何解决消息堆积问题
消息回推机制
消息回推机制可以解决消息堆积的问题
如果发布者应用程序因为发布消息太快而开始对RabbitMQ
造成压力,那么RabbitMQ
将:
- 发送
Channel.FlowRPC
方法来让使发布者阻塞 - 只有发送另一条
Channel.Flow
命令,发布者才能解除阻塞状态继续发送消息
但是,在RabbitMQ2.0
之前存在发布者没有监听Channel.Flow
方法的极端情况。
在RabbitMQ3.2
之后
采用TCP
背压的机制来解决这种极端情况,即采用一个连接信用阈值的机制:
-
RabbitMQ
将根据RPC
请求的完成情况给每一个发布者打分,RabbitMQ
只处理有足够信用的发布者的消息。 - 同时借助
Connection.Blocked
和Connection.Unblocked
这两个异步方法,来通知客户端进行阻塞和取消阻塞。
知识点:
a.实际场景解决消息堆积问题的流程:
- 修复现有
consumer
的问题,并将其停掉。 - 重新创建一个容量更大的
topic
,比如patition
是原来的10倍。 - 编写一个临时
consumer
程序,消费原来积压的队列。该consumer
不做任何耗时的操作,将消息均匀写入新创建的队列里。 - 将修复好的
consumer
部署到原来10倍的机器上消费新队列。 - 消息积压解决后,恢复原有架构
核心思路,提高消费者的消费能力。
RabbitMQ集群
在RabbitMQ
集群里,运行时状态包含:
- 交换器
- 队列
- 绑定器
- 用户
- 虚拟主机
- 策略
它们对所有节点都可用。这种共享运行时状态的特性,使得集群中的每一个节点都能绑发布或者删除连接到第一个节点创建的交换器,因此当节点中出现节点宕机,其运行状态以及数据都是可以转移恢复的。
RabbitMQ提供了HA队列,它可以
- 跨越多个集群节点并共享同步队列状态和消息数据
-
HA
队列中的某个节点发生故障的话,集群中的其他节点仍然保存着消息和队列状态。 - 当故障的节点重新加入集群时,该节点会完全同步自节点故障以来所有被消费的消息。
- 通过
HA
队列,我们可以实现集群中有节点专门服务发布者,有节点专门服务于消费者。 - 一个消息被发送到
RabbitMQ
集群中的任何一个节点时,该消息会被路由到队列中去,无关队列在集群中的位置。
HA队列需要注意的地方
- 不能跨越
WAN
或者互联网来搭建RabbitMQ
集群 - 集群中的节点不宜过多,通常上限在32-64个,太多的节点会增加内部共享运行时状态的复杂性。
在RabbitMQ集群有三类节点
- 磁盘节点:将集群的运行时状态会同时存储在内存和磁盘中。
- 内存节点:只会把集群的运行时状态存储在内存中。
- 状态节点:在任意时刻,一个集群只能有一个统计节点,用来负责收集集群中每一个节点的全部统计数据和状态数据。
知识点:
- 磁盘节点和内存节点都可以进行数据持久化。
- 当节点或者集群崩溃时,在磁盘节点启动并重新加入集群时,会被用来重建集群的运行状态。而内存节点不会包含任何运行时数据,集群中其他节点会把队列定义等信息发送给它。
-
RabbitMQ
集群中最少要有一个磁盘节点,但是过多磁盘节点也会导致共享状态不一致的问题。
RabbitMQ集群间通讯
在RabbitMQ
中提供了两种方式可以实现跨越不同的RabbitMQ
集群间的通讯:
- 联合交换器:允许发往上游节点交换器的消息被透明地发送至下游节点中相同名称的交换器上。
- 联合队列:允许下游节点扮演上游节点*享队列的消费者角色,为下游节点提供了轮询消费消息的能力。
联合交换器
在一个RabbitMQ
集群中所有的节点之间要有一个低延迟的网络环境。
RabbitMQ
捆绑的一个插件可以实现下游RabbitMQ
可以从先前已经存在的RabbitMQ
服务器上获取消息。
- 某种意义上,这个联合插件的行为既像消费者又像消息发布者。它在上游节点消费消息,并在同一个节点上将这些消息进行重新发送。
- 联合交换器提供了一种简单、可靠、健壮的方式来扩展
RabbitMQ
的基础设施,实现了RabbitMQ
集群无法实现的跨网络延迟。 - 联合插件还能桥接逻辑上隔离的
RabbitMQ
集群,使得一个数据中心可以有两个不同版本的RabbitMQ
集群。
联合队列
- 采用联合队列的方式可以,解决消息的负载均衡,通过借助其他集群节点来消费队列消息,避免消息堆积现象。
- 某种意义上联合队列与联合交换器的配置没有本质上的区别,很多时候上两个是一起使用的。
通时借助联合插件我们还能实现:
- 提供消息处理的冗余(即容错性)
- 实现基于地理分布的应用
- 无缝的实现
RabbitMQ
集群的版本升级
消息队列之Kafka
Kafka是一个分布式流平台,它具以下关键功能:
- 消息传递系统:发布和订阅记录流,类似于消息队列或企业消息传递系统。
- 存储系统:以容错的持久方式存储记录流。
- 流处理:处理发生的记录流。
Kafka可以作为消息队列具有以下特性:
-
高可用:写入
Kafka
的数据将写入磁盘并进行复制以实现容错 -
持久性、可靠性:
Kafka
提供可生产者确认机制保证了数据的可靠性,并且消息都是以Record
的结构写入到本地文件,节点关闭不会丢失数据 -
高吞吐量、低延迟:
Kafka
每秒可以处理几十万条消息,它的延迟最低只有几毫秒 -
可扩展性:
Kafka
集群支持热扩展 - 高并发:支持数千个客户端同时读写
- 高性能:存储并允许客户端控制其读取位置(方便用户定位资源),依靠使用文件系统和依靠页面缓存,减少对内存的使用
知识点:
-
Kafka
集群将记录流存储在称为topic
的类别中.一个topic
可以由几个Partition
组成。 - 每个记录(
Record
)由一个键,一个值和一个时间戳组成。
主题和日志
对于每个主题,Kafka
群集都会维护一个分区日志,如下所示:
每个分区(Partition
)都是有序的(所以每一个Partition
内部都是有序的),不变的记录序列,这些记录连续地附加到结构化的提交日志中。
分区中的每个记录均分配有一个称为偏移的顺序ID
号,该ID
唯一地标识分区中的每个记录。
每个消费者保留的唯一元数据是该消费者在日志中的偏移量或位置。此偏移量由使用者控制:
- 通常,使用者在读取记录时会线性地推进其偏移量,但实际上,由于位置是由使用者控制的,因此它可以按喜欢的任何顺序使用记录。
- 例如,使用者可以重置到较旧的偏移量以重新处理过去的数据,或者跳到最近的记录并从“现在”开始使用。(类似于游标指针的方式顺序处理数据,并且该指标可以任意移动)
消息的有序性
Kafka
消息的有序性,是采用消息键保序策略来实现的。
一个topic
,一个partition
(分区),一个consumer
,内部单线程消费,写N个内存queue
,然后N个线程分别消费一个内存queue
。
- 通过指定
key
的方式,具有相同key
的消息会分发到同一个partition
-
partition
会内部对其进行排序,保证其有序性。
分区的设计结构
- 提供了负载均衡的能力,实现了系统的高伸缩性。
- 不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。
- 可以通过添加新的节点机器来增加整体系统的吞吐量。
-
Kafka
分区的设计逻辑和ES
分片的设计逻辑是相同的。
生产者分区策略
生产者分区策略是 决定生产者将消息发送到哪个分区的算法,即实现负载均衡。
主要有以下几种:
- 轮询策略:顺序分配,轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略。(默认、常用)
- 随机策略:所谓随机就是我们随意地将消息放置到任意一个分区上。
-
消息键保序策略:
Kafka
中每条消息都会有自己的key
,一旦消息被定义了 Key
,那么你就可以保证同一个 Key
的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的。
生产者批量发送
-
Producer
端可以在内存中合并多条消息后,以一次请求的方式发送了批量的消息给 broker
,从而大大减少 broker
存储消息的IO
操作次数。 - 但也一定程度上影响了消息的实时性,相当于以时延代价,换取更好的吞吐量。
Kafka的消息压缩机制
- 一般情况下压缩机制:在生产者端解压、Broker端保持、消费者端解压
-
Kafka
支持 4 种压缩算法:GZIP
、Snappy
、LZ4
,从 2.1.0 开始,Kafka
正式支持 Zstandard
算法(简写为 zstd
)。 - 压缩机制本质上是以消费者端
CPU
性能换取节省网络传输带宽以及Kafka Broker
端的磁盘占用。
生产者端压缩
- 生产者压缩通常采用的
GZIP
算法, Producer
启动后生产的每个消息集合都是经 GZIP
压缩过的 - 生产者压缩可以节省网络传输带宽以及
Kafka Broker
端的磁盘占用。 - 开启
GZIP
压缩:<entry key="compression.type" value="gzip"/>
Broker压缩
大部分情况下 Broker
从 Producer
端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改。
但以下情况会引发Broker
压缩:
-
Broker
端和Producer
端采用了不同的压缩算法 -
Broker
端发生了消息格式转换
消费者端解压
-
Kafka
会将启用了哪种压缩算法封装进消息集合中,在Consumer
中进行解压操作。
消息可靠性机制
kafka提供以下特性来保证其消息的不丢失,从而保证消息的可靠性:
- 生产者确认机制
- 生产者失败回调机制
- 失败重试机制
- 消费者确认机制
- 副本机制
- 限定
Broker
选取Leader
机制
生产者确认机制
- 当
Kafka
的若干个 Broker
(根据配置策略,可以是一个,也可以是ALL
)
成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。 - 设置
acks = all
。如果设置成 all
,则表明所有副本 Broker
都要接收到消息,该消息才算是“已提交”。
生产者失败回调机制
- 生产者不要使用
producer.send(msg)
,而要使用 producer.send(msg,callback)
。记住,一定要使用带有回调通知的 send
方法。 -
producer.send(msg, callback)
采用异步的方式,当发生失败时会调用callback
方法。
失败重试机制
- 设置
retries
为一个较大的值。这里的 retries
同样是 Producer
的参数,对应前面提到的 Producer
自动重试。 - 当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了
retries > 0
的 Producer
能够自动重试消息发送,避免消息丢失。
消费者确认机制
- 确保消息消费完成再提交,确保如何消费失败了,消息还存在消息队列中。
-
Consumer
端有个参数 enable.auto.commit
,最好把它设置成 false
,并采用手动提交位移的方式。
副本机制
- 设置
replication.factor >= 3
。这也是 Broker
端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。 - 设置
min.insync.replicas > 1
。这依然是 Broker
端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 - 确保
replication.factor > min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1
。
限定Broker选取Leader机制
- 设置
unclean.leader.election.enable = false
。这是 Broker
端的参数,它控制的是哪些
Broker
有资格竞选分区的 Leader
。 - 如果一个
Broker
落后原先的 Leader
太多,那么它一旦成为新的 Leader
,必然会造成消息的丢失。 - 故一般都要将该参数设置成
false
,即不允许这种情况的发生。
消息幂等性和事务
由于kafka
生产者确认机制、失败重试机制的存在,kafka
的消息不会丢失但是存在由于网络延迟等原因造成重复发送的可能性:
-
kafka
提供了幂等性Producer
的方式来保证消息幂等性。 - 使用
<entry key="enable.idempotence" value="true"/>
的方式开启幂等性。
幂等性 Producer
的作用范围:
- 只能保证单分区上的幂等性,即一个幂等性
Producer
能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。 - 只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,可以理解为
Producer
进程的一次运行。当你重启了 Producer
进程之后,这种幂等性保证就丧失了。
Kafka事务
- 事务型
Producer
能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。 - 事务型
Producer
也不惧进程的重启。Producer
重启回来后,Kafka
依然保证它们发送消息的精确一次处理。 - 使用
<entry key="enable.idempotence" value="true"/>
的方式开启事务。
探究Kafka消费者的工作原理
消费者组
-
consumer group
是kafka
提供的可扩展且具有容错性的消费者机制。它是由一个或者多个消费者组成,它们共享同一个Group ID
. - 组内的所有消费者协调在一起来消费订阅主题(
subscribed topics
)的所有分区(partition
)。 - 每个分区只能由同一个消费组内的一个
consumer
来消费。
consummer group有以下的特性:
-
consumer group
下可以有一个或多个consumer instance
,consumer instance
可以是一个进程,也可以是一个线程(所以消费者可以采用多线程的方式去消费消息) -
group.id
是一个字符串,唯一标识一个consumer group
-
consumer group
下订阅的topic
下的每个分区只能分配给某个group
下的一个consumer
(当然该分区还可以被分配给其他group
)
消费者位置
消费者位置,即位移。 消费者在消费的过程中需要记录自己消费了多少数据。
位移提交有自动、手动两种方式进行位移提交。
-
自动提交:在
kafka
拉取到数据之后就直接提交,这样很容易丢失数据 - 手动提交:成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 (存在数据处理失败的可能性),所以这时我们就需要进行手动提交kafka的offset下标。
- 关闭自动提交,使用spring实现的提交方案:
<entry key="enable.auto.commit" value="false"/>
Kafka
通过一个内置Topic(__consumer_offsets)
来管理消费者位移。
Rebalance机制
Rebalance
本质上是一种协议,规定了一个consumer group
下的所有consumer
如何达成一致来分配订阅topic的每个分区。
Kafka
提供了一个角色:coordinator
来执行对于consumer group
的管理。
-
Group Coordinator
是一个服务,每个Broker
在启动的时候都会启动一个该服务。 -
Group Coordinator
的作用是用来存储Group
的相关Meta
信息,并将对应Partition
的Offset
信息记录到Kafka
内置Topic(__consumer_offsets)
中。
Rebalance 过程分为两步:Join 和 Sync。
Join 顾名思义就是加入组:
- 所有成员都向
coordinator
发送JoinGroup
请求,请求加入消费组。 - 一旦所有成员都发送了
JoinGroup
请求,coordinator
会从中选择一个consumer
担任leader
的角色,并把组成员信息以及订阅信息发给leader
注意:leader
和coordinator
不是一个概念。leader
负责消费分配方案的制定。
Sync,这一步leader开始分配消费方案:
- 分配哪个
consumer
负责消费哪些topic
的哪些partition
。 - 一旦完成分配,
leader
会将这个方案封装进SyncGroup
请求中发给coordinator
,非leader
也会发SyncGroup
请求,只是内容为空。 -
coordinator
接收到分配方案之后会把方案塞进SyncGroup
的response
中发给各个consumer
。
Kafka过期数据清理的两种方式
删除(默认)
- 当删除的条件满足:定期或定大小的条件后日志将被“删除”
- 这里的删除其实只是将该日志进行了“
delete
”标注,文件只是无法被索引到了而已。 - 但是文件本身,仍然是存在的,只有当过了
log.segment.delete.delay.ms
这个时间以后,文件才会被真正的从文件系统中删除。
压缩
- 压缩时将根据
Key
将消息聚合,只保留最后一次出现时的数据。 - 这种策略只适合特殊场景,比如消息的
key
是用户ID
,消息体是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。 - 压缩策略支持删除,当某个
Key
的最新版本的消息没有内容时,这个Key
将被删除,这也符合以上逻辑。
Kafka如何防止消息丢失
Kafka生产者:
- 失败回调机制:处理网络抖动或者消息过大的问题
- 针对网络抖动,
Kafka
生产者可以通过retries=N
设置失败重试次数 -
kafka
的应答模式是配置在生产者上的,acks=all
配置所有的partition
副本都收到消息了才返回提交消息成功。
Kafka的Broker端:
- replication.factor>=3:消息分区的副本数量
- min.insync.replicas>1:消息写入多少个副本才算提交
- replication.factor>min.insync.replicas:保证可用性,如果相等,则任何一个副本挂了,则整个分区无法工作。
Kafka消费者
- 关闭自动更新
offset
,等到数据被处理后再手动跟新offset
。 - 开启了位移自动提交,多线程处理的时候,如果有一个线程出现问题,但是还是提交了位移,会发生消息丢失。消费者端
enable.auto.commit=false
,禁止手动提交位移,避免多线程消费的时候消息丢失
Kafka的高性能实现
Kafka通过以下四种方式实现百万级TPS:
- 顺序写入数据,在
Partition
末尾追加,所以速度最优。 - 使用
MMAP
技术将磁盘文件与内存映射,Kafka
可以像操作磁盘一样操作内存。 - 通过
DMA
技术实现零拷贝,减少数据传输次数。 - 读取数据时配合
sendfile
直接暴力输出,批量压缩把所有消息变成一个批量文件,合理减少网络IO
损耗。
顺序读写磁盘
- 生产者写入数据和消费者读取数据都是顺序读写的
- 每次接收到新数据后
Kafka
会把数据插入到文件末尾。 - 顺序写入会导致删除数据不方便,所以
Kafka
一般会把所有的数据都保留下来,每个消费者(对每个Topic
都有一个 offset
用来记录读取进度或者叫坐标。
MMAP技术
-
MMAP
也就是内存映射文件,在64位操作系统中一般可以表示 20G 的数据文件,它的工作原理是直接利用操作系统的 Page
来实现文件到物理内存的直接映射,完成映射之后对物理内存的操作会被同步到硬盘上。 - 通过
MMAP
技术进程可以像读写硬盘一样读写内存(逻辑内存),不必关心内存的大小,因为有虚拟内存兜底。这种方式可以获取很大的I/O
提升,省去了用户空间到内核空间复制的开销。
DMA技术
DMA
技术,就是直接内存访问,可以减少 CPU
的等待时间。
如果没有DMA
技术需四次数据传输,就可把数据放到网卡缓冲区:
- 从硬盘上将数据读到操作系统内核的缓冲区里,这个传输是通过
DMA
搬运的。 - 从内核缓冲区里面的数据复制到分配的内存里面,这个传输是通过
CPU
搬运的。 - 从分配的内存里面再写到操作系统的
Socket
的缓冲区里面去,这个传输是由 CPU
搬运的。 - 从
Socket
的缓冲区里面写到网卡的缓冲区里面去,这个传输是通过 DMA
搬运的。
通过DMA
技术Kafka
中只需进行两次数据传输,就可把数据放到网卡缓冲区:
- 通过
DMA
从硬盘直接读到操作系统内核的读缓冲区里面。 - 根据
Socket
的描述符信息直接从读缓冲区里面写入到网卡的缓冲区里面。
批量操作
-
Kafka
把所有的消息都存放在一个一个的文件中,当消费者需要数据的时候 Kafka
直接把文件发送给消费者。 - 虽然消费者只需要一条消息啊,
kafka
把整个文件都发送过来了, 但是消费者可以通过offset
记录消费进度。 - 发送文件还有一个好处就是可以对文件进行批量压缩,减少网络
IO
损耗。