关于Kafka区分请求处理优先级的讨论

时间:2022-11-26 11:16:15

所有的讨论都是基于KIP-291展开的。抱歉,这又是一篇没有图的文字。


目前Kafka broker对所有发过来的请求都是一视同仁的,不会区别对待。不管是用于生产消费的PRODUCE和FETCH请求,还是controller端发送的LeaderAndIsr/StopReplica/UpdateMetadata请求,亦或是其他类型的请求也是一样。通常我们这里把PRODUCE/FETCH请求称为数据类请求;把controller发送的那3种请求称为控制类请求或controller类请求——在源码中前者被称为data plane request,后者称为controller plane request。

这种公平处理原则在很多场合下都是不合理的。为什么?简单来说控制类请求具有直接令数据类请求失效的能力。举个例子,如果我有个topic,单分区双副本,其中broker0上保存leader副本,broker1上保存follower副本。当broker0上积压了大量的PRODUCE请求时,此时用户执行了重分区或preferred分区选举将broker1变更成了leader,那么controller会向broker0发送LeaderAndIsr请求告诉它现在是一个follower了,而broker1上的follower已经停止向leader拉取数据(因为它要成为leader了)——此时一个比较尴尬的情形出现了:如果producer的acks设置的是all,那么这些在LeaderAndIsr请求之前积压的PRODUCE请求就无法正常完成——要么一直缓存在purtagory中要么请求超时返回给client。设想一下,如果Kafka能够及时地处理LeaderAndIsr请求,那么这些积压的PRODUCE请求就能立即失败(NOT_LEADER_FOR_PARTITION),马上返回给client。Client不用等到 purgatory中的请求超时,降低了请求的处理时间。即使acks不是all,纵然积压的PRODUCE请求写入本地日志后成功返回,但处理过LeaderAndIsr请求后broker0上副本变为follower,还要执行截断(truncation),因此在client看来这些消息就丢失了。

再举一个例子,同样是在积压大量数据类请求的broker上,如果用户删除了topic,那么StopReplica请求无法及时处理,导致topic无法真正删除,增加了删除topic的延时。

最后还可以举个例子说明对UpdateMetadata的影响。如果UpdateMetadata不能及时处理,broker上保存的就是过期的元数据,当client获取到这些数据时,不管是producer还是consumer都可能无法正常工作,直到获取到最新的元数据信息。

通过上面3个例子可以看出通常情况下我们希望controller类请求的处理优先级要高于数据类请求,这也是社区做KIP-291的初衷 。可喜的是Kafka 2.2正式实现了这个功能,下面我们来看看社区是怎么做的:

其实在KIP-291之前,我也思考过这个问题。当时我提出的想法是这样的:在broker的KafkaRequestHandlerPool中实现一个优先级队列,当controller类请求到达时,它能够”抢占式“地排在处理队列的最前部——这是很自然的想法,所以我本以为KIP-291也是这么实现的,但通篇看下来我尴尬地发现我这个解决思路记录在“Rejected Alternatives"中。这个方案最大的问题在于它无法处理队列已满的情形,即当处理队列已经无法容纳任何新的请求时该如何支持优先处理controller类请求?纵然有优先级队列也无法解决这个问题。

KIP-291是怎么解决的呢?很简单,Kafka重新为controller类请求做了专属的监听器+请求队列+acceptor+processor线程。监听器通过Kafka的listeners和advertised.listeners设置,新的请求队列则专门保存controller类请求,而acceptor和processor线程负责接收网络发送过来的以及处理队列中的controller类请求。我们一个一个说吧。

当前,用户可以在listeners中指定多套监听器,比如PLAINTEXT://kafka1:9092, SSL://kafka1:9093。你其实也可以自定义你的监听器,比如INTERNAL://kafka1:9094。用户可以指定broker端参数inter.broker.listener.name或security.inter.broker.protocol(两个不能同时指定)来设定,同时你还需要在listener.security.protocol.map中指定这个自定义listener使用的安全协议,比如: listener.security.protocol.map=INTERNAL:PLAINTEXT。KIP-291复用了这个设计,如果你设置了inter.broker.listener.name或security.inter.broker.protocol,Kafka会默认使用这个listener专属服务controller类请求。同时社区还引入了一个新的参数:control.plane.listener.name,用来专门让你设置服务controller类请求的监听器名称。这个参数的优先级要高于前面那两个参数,因此还是推荐用户直接设置此参数,比如设置control.plane.listener.name=CONTROLLER,同时更新listener.security.protocol.map,增加CONTROLLER:PLAINTEXT匹配对(假设你用的是PLAINTEXT)。这就是为controller类请求创建监听器的方法。

下面说请求队列和acceptor、processor线程。 其实也不用细说,和现有的设计一模一样,只是默认的队列大小不再是500,而是20,默认的线程数不再是8而是2,因为我们假设controller类请求通常不应该有积压。具体的实现原理有兴趣的话直接读KafkaRequestHandlerPool.scala、RequestChannel.scala和SocketServer.scala源码吧。还需要修改的地方是controller代码,特别是在ControllerChannelManager.scala中增加新的broker时一定要使用controller类请求专属的监听器。

除了以上这些,该KIP也引入了很多监控controller类请求处理的JMX指标,如队列请求数、线程空闲程度等,这些和之前的指标都是一样的,只是仅监控controller plane监听器之用。再说一点,当前Kafka支持动态地调整请求处理线程数。在对请求进行区分处理后,我估计后续也要支持对controller类请求线程数的动态调整吧。

总体来说,将请求做区分处理后对于繁忙Kafka集群将能够更迅速地处理控制类请求,表现为状态的更新更加及时,集群不一致状态窗口将会缩小,同时还提升了整体可用性。目前该KIP还只是对请求做两类处理,也许日后会做一些更加细粒度的区分——比如Metadata请求是否也应该享有更高的优先级处理。

最后还想提一句,KIP-291是我认为近期社区改动影响比较大的两个KIP之一。另一个则是KIP-392——还记得Kafka不能从follower副本读数据的限制吧?这个KIP要打破这个限制!只是目前该KIP还在讨论中,我们后面拭目以待吧。