案例中丢失消息的原因
- 客户端版本为2.6.x,客户端机器可能在尝试连接 Kafka broker 时因为超时断开,导致无法成功接收或更新 metadata
- 由于客户端没有及时感知到broker重新选举的metadata数据,一直尝试向宕机的broker发送mq,在2分钟后超时,最终失败导致部分mq消息丢失
如何解决该问题场景?
升级版本至2.7.0(含)之上。官方如何修复的该问题
KIP-601 修复了两个主要问题:
-
可配置的连接超时:引入了
socket.connection.setup.timeout.ms
参数,允许客户端配置连接建立的超时时间。如果客户端在设定的时间内无法与 broker 建立连接,它将超时并进行失败处理。- socket.connection.setup.timeout.ms:客户端等待建立初始套接字连接的时间。如果在超时之前未建立连接,则网络客户端将关闭套接字通道。默认值为 10 秒。
- socket.connection.setup.timeout.max.ms:客户端等待建立初始套接字连接的最大时间。每次连续连接失败后,连接建立超时时间将呈指数增长,直至达到此最大值。为了避免连接风暴,将对退避应用 0.2 的随机化因子,从而产生一个介于计算值以下 20% 和以上 20% 之间的随机范围。默认值为 127 秒。
- 优化节点选择:在没有现有连接通道时,客户端会选择那些失败次数较少的节点,以优化重新连接的尝试。
修复该问题后kafka就安全可靠了吗?在某些场景依然会存在问题。要想保障消息的可靠,可以采用两个常见的可靠性保障方案
客户端可靠性
- 同步发送,并且感知到失败时写入DB,定时任务进行兜底重试,保证最终一致性
- 异步发送,增加redis缓存唯一key,成功接收到callback时移除redis缓存,定时任务兜底检查redis缓存中失败的mq进行重试
broker可靠性
- 采用高等级配置acks=ALL
kafka客户端按照批次发送消息会丢失消息吗?
Kafka 的批次消息(batch)在客户端宕机时可能会导致消息丢失,但具体取决于几个因素,包括客户端配置和 Kafka 的容错机制。
影响因素:
-
客户端的批次处理:
- 生产者在发送消息时,通常会将多条消息打包成一个批次(batch),然后一起发送给 Kafka broker。如果客户端在批次发送完成之前宕机,那么这个批次中的消息还未发送到 Kafka,可能会丢失。
-
客户端的重试和幂等性:
-
重试机制的局限性:Kafka 的重试机制(通过
retries
参数)可以处理网络故障或临时 broker 不可用的情况,但并不能应对客户端宕机。如果客户端在批次消息尚未发送或部分消息未确认(ACK)时宕机,客户端重启后没有保留重试上下文,这些消息是无法被重新发送的,因为客户端已丢失相关的内存状态。 -
幂等性生产者:如果启用了
enable.idempotence=true
,Kafka 可以保证每条消息只被 broker 接收一次,即使客户端重启后重试,消息不会重复发送。但这并不能防止客户端宕机导致的批次消息丢失,特别是在消息还没有被发送到 Kafka 的情况下。
-
重试机制的局限性:Kafka 的重试机制(通过
-
事务性生产者:
- 如果使用了事务性生产者(
transactional.id
配置),Kafka 可以提供更高的可靠性,确保整个事务内的所有消息要么成功提交,要么完全回滚。如果生产者在事务未完成前宕机,Kafka 会回滚未提交的事务,防止部分消息提交。这可以防止一部分消息丢失,但仍然无法恢复那些从未发送到 broker 的消息。
- 如果使用了事务性生产者(
-
acks 配置:
-
acks=all
可以确保消息被写入到所有同步副本中后才认为发送成功。如果客户端在发送消息前宕机,消息没有到达 broker,自然也无法通过副本机制进行恢复。因此,未成功发送的批次消息依然会丢失。
-
小结:
Kafka 的批次消息如果客户端在消息发送之前宕机,未发送的消息会丢失。为了减少此类情况的发生,可以启用幂等性、事务性和合理的重试机制,但这些措施主要是为了避免重复和部分消息丢失,不能完全避免宕机时未发送批次的丢失。