摘自: 《kafka权威指南》
集群间成员关系
Kafka 使用Zoo keeper 来维护集群成员的信息。每个broker 都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。在broker 启动的时候,它通过创建
临时节点把自己的ID 注册到Zookeeper 。Kafka 组件订阅Zoo keeper 的/brokers/ids 路径(bro ker 在Zoo keeper 上的注册路径),当有broker 加入集群或退出集群时,这些组件就可以获得通知。
如果你要启动另一个具有相同ID 的broker ,会得到一个错误一一新broker 会试着进行注册,但不会成功,因为Zoo keeper 里已经有一个具有相同ID 的broker 。在broker 停机、出现网络分区或长时间垃圾回收停顿时, broker 会从Zookeeper 上断开连接,此时broker 在启动时创建的临时节点会自动从Zoo keeper 上移除。监听broker 列表的
Kafka 组件会被告知该broker 已移除。
在关闭broker 时,它对应的节点也会消失,不过它的ID 会继续存在于其他数据结构中。例如,主题的副本列表(下面会介绍)里就可能包含这些白。在完全关闭一个broker 之后,如果使用相同的m 启动另一个全新的broker ,它会立即加入集群,井拥有与旧broker相同的分区和主题。
上面说的kafka启动之后会向zk注册id,可以在zk上查看这些注册的消息!
[zk: localhost:(CONNECTED) ] ls / #显示当前zk上的消息信息
[isr_change_notification, zookeeper, admin, consumers, config, controller, brokers, controller_epoch]
[zk: localhost:(CONNECTED) ] ls /brokers/ids #显示注册的brkers的id。
[, , ] #可以尝试着停掉一个对应kafka,对应的broker id就会自动从这里删除。
控制器
控制器是kafka集群中蛮重要的一个组件,下面我们会说明broker如何成为控制器的。
推荐一篇控制器的博文:https://blog.csdn.net/u013256816/article/details/80865540
控制器其实就是一个broker,只不过它除了具有一般broker 的功能之外,还负责分区首领的选举。集群里第一个启动的broker通过在zookeeper里创建一个临时节点/controller让自己成为控制器。其他broker 在启动时也会尝试创建这个节点,不过它们会收到一个“节点已存在”的异常,然后“意识”到控制器节点已存在,也就是说集群里已经有一个控制器了。其他broker在控制器节点上创建Zookeeper watch对象,这样它们就可以收到这个节点的变更通知。这种方式可以确保集群里一次只有一个控制器存在。
如果控制器被关闭或者与Zookeeper 断开连接,Zookeeper上的临时节点就会消失。集群里的其他broker 通过watch 对象得到控制器节点消失的通知,它们会尝试让自己成为新的控制器。第一个在Zookeeper 里成功创建控制器节点的broker 就会成为新的控制器,其他节点会收到“节点已存在”的异常,然后在新的控制器节点上再次创建watch 对象。每个新选出的控制器通过Zookeeper 的条件递增操作获得一个全新的、数值更大的controller epoch。其他broker 在知道当前controller epoch后,如果收到由控制器发出的包含较旧epoch 的消息,就会忽略它们。
当控制器发现一个broker已经离开集群(通过观察相关的Zookeeper 路径),它就知道,那些失去首领的分区需要一个新首领(这些分区的首领刚好是在这个broker 上)。控制器遍历这些分区,并确定谁应该成为新首领(简单来说就是分区副本列表里的下一个副本),然后向所有包含新首领或现有跟随者的broker 发送请求。该请求消息包含了谁是新首领以及谁是分区跟随者的信息。随后,新首领开始处理来自生产者和消费者的请求,而跟随者
开始从新首领那里复制消息。
当控制器发现一个broker加入集群时,它会使用broker来检查新加入的broker 是否包含现有分区的副本。如果有,控制器就把变更通知发送给新加入的broker 和其他broker,
新broker 上的副本开始从首领那里复制消息。简而言之, Kafka 使用Zookeeper的临时节点来选举控制器, 并在节点加入集群或退出集群时通知控制器。控制器负责在节点加入或离开集群时进行分区首领选举。控制器使用epoch 来避免“脑裂” 。“脑裂”是指两个节点同时认为自己是当前的控制器。
上面的过程说明了broker怎么选举为controller,以及当前的控制器宕之后,新的controller怎么选举过程!在zk上查看控制器的相关信息,如下!
[zk: localhost:(CONNECTED) ] get /controller
{"version":,"brokerid":,"timestamp":""} #显示了当前那个broker是首领
cZxid = 0x600000142
ctime = Fri Dec :: CST
mZxid = 0x600000142
mtime = Fri Dec :: CST
pZxid = 0x600000142
cversion =
dataVersion =
aclVersion =
ephemeralOwner = 0x367ed5d87210001
dataLength =
numChildren = [zk: localhost:(CONNECTED) ] get /controller_epoch
27 #这个反应了首领的变更次数
cZxid = 0x200000017
ctime = Thu Dec :: CST
mZxid = 0x600000143
mtime = Fri Dec :: CST
pZxid = 0x200000017
cversion =
dataVersion =
aclVersion =
ephemeralOwner = 0x0
dataLength =
numChildren =
kafka复制
复制功能是Kafka 架构的核心。在Kafka 的文档里, Kafka 把自己描述成“ 一个分布式的、可分区的、可复制的提交日志服务”。复制之所以这么关键,是因为它可以在个别节点失效时仍能保证Kafka 的可用性和持久性。
Kafka 使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。那些副本被保存在broker 上,每个broker 可以保存成百上千个属于不同主题和分区的副本。
副本有以下两种类型。
- 首领副本
每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本。
- 跟随者副本
首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一的任务就是从首领那里复制消息,保持与首领一致的状态。如果首领发生崩渍,其中的一个
跟随者会被提升为新首领。
首领的另一个任务是搞清楚哪个跟随者的状态与自己是一致的。跟随者为了保持与首领的状态一致、在有新消息到达时尝试从首领那里复制消息,不过有各种原因会导致同步失败。例如,网络拥塞导致复制变慢, broker 发生崩横导致复制滞后,直到重启brok er 后复制才会继续。为了与首领保持同步,跟随者向首领发送获取数据的请求,这种请求与悄费者为了读取悄息而发送的请求是一样的。首领将响应消息发给跟随者。请求消息里包含了跟随者想要获取消息的偏移量,而且这些偏移量总是有序的。
通过查看每个跟随者请求的最新偏移量,首领就会知道每个跟随者复制的进度。如果跟随者在10 s 内没有请求任何消息,或者虽然在请求消息,但在10s 内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本无陆与首领保持一致,在首领发生失效时,它就不可能成为新首领一一毕竟它没有包含全部的消息。相反,持续请求得到的最新悄息副本被称为同步的副本。在首领发生失效时,只有同步副本才有可能被选为新首领。
除了当前首领之外,每个分区都有一个首选首领创建主题时选定的首领就是分区的首选首领。之所以把它叫作首选首领,是因为在创建分区时,需要在b roker 之间均衡首领
(后面会介绍在broker 间分布副本和首领的算怯)。因此,我们希望首选首领在成为真正的首领时, broker 间的负载最终会得到均衡。默认情况下,Kafka auto.leader.rebalance被设为true ,它会检查首选首领是不是当前首领, 如果不是,并且该副本是同步的,那么就会触发首领选举,让首选首领成为当前首领。
kafka处理请求
broker 的大部分工作是处理客户端、分区副本和控制器发送给分区首领的请求。Kafka 提供了一个二进制协议(基于TCP ),指定了请求消息的格式以及broker 如何对请求作出响应一一包括成功处理请求或在处理请求过程中遇到错误。客户端发起连接并发送请求,broker 处理请求井作出响应。broker 按照请求到达的顺序来处理它们一一这种顺序保证让Kafka 具有了消息队列的特性,同时保证保存的消息也是有序的。
broker 会在它所监听的每一个端口上运行一个acceptor线程,这个钱程会创建一个连接,并把它交给processor线程去处理。processor线程(也被叫作“网络线程”)的数量是可配置的。网络线程负责从客户端获取请求悄息,把它们放进请求队列,然后从响应队列获取响应消息,把它们发送给客户端。请求消息被放到请求队列后,IO 线程会负责处理它们。
生产请求和获取请求都必须发送给分区的首领副本。如果broker 收到一个针对特定分区的请求,而该分区的首领在另一个broker 上,那么发送请求的客户端会收到一个“非分区首领”的错误响应。当针对特定分区的获取请求被发送到一个不含有该分区首领的broker上,也会出现同样的错误。Kafka 客户端要自己负责把生产请求和获取请求发送到正确的broker 上。
那么客户端怎么知道该往哪里发送请求呢?客户端使用了另一种请求类型,也就是元数据请求。这种请求包含了客户端感兴趣的主题列表。服务器端的响应消息里指明了这些主题所包含的分区、每个分区都有哪些副本, 以及哪个副本是首领。元数据请求可以发送给任意一个broker ,因为所有broker 都缓存了这些信息。
一般情况下,客户端会把这些信息缓存起来,并直接往目标broker 上发送生产请求和获取请求。它们需要时不时地通过发送元数据请求来刷新这些信息(刷新的时间间隔通
过meta.max.age.ms参数来配置),从而知道元数据是否发生了变更一一比如,在新broker 加入集群时,部分副本会被移动到新的broker 上。另外,如果客户端收到“非首领”错误,它会在尝试重发请求之前先刷新元数据,因为这个错误说明了客户端正在使用过期的元数据信息,之前的请求被发到了错误的broker 上。
生产者的请求:生产者的请求与acks参数配置的数值有关,这个参数指定了需要经过多个broker确定才算消息写入成功。
之后,悄息被写入本地磁盘。在Linux 系统上,消息会被写到文件系统缓存里,并不保证它们何时会被刷新到磁盘上。Kafl< a 不会一直等待数据被写到磁盘上一一它依赖复制功能来保证消息的持久性。
在消息被写入分区的首领之后, broker 开始检查acks 配置参数一一如果acks 被设为0 或1,那么broker 立即返回响应;如果ac k s 被设为all ,那么请求会被保存在一个叫作炼狱的缓冲区里,直到首领发现所有跟随者副本都复制了消息,晌应才会被返回给客户端。
消费者请求:
请求需要先到达指定的分区首领上,然后客户端通过查询元数据来确保请求的路由是正确的。首领在收到请求时,它会先检查请求是否有效,然后broker 将按照客户端指定的数量上限从分区里读取消息,再把消息返回给客户端。Kafka 使用零复制技术向客户端发送消息一一也就是说, Kafka 直接把消息从文件(或者更确切地说是Linux文件系统缓存)里发送到网络通道,而不需要经过任何中间缓冲区。这是Kafka 与其他大部分数据库系统不一样的地方,其他数据库在将数据发送给客户端之前会先把它们保存在本地缓存里。这项技术避免了字节复制,也不需要管理内存缓冲区,从而获得更好的性能。
客户端可以是设置broker最多可以从一个分区返回多少数据,这个限制非常重要,因为客户端需要为broker返回的数据分配足够的内存。如果没有这个限制,broker返回的大量数据可能耗尽客户端内存。也可以设置返回数据的下限。这样在主题消息量不是很大的情况下,这样可以减少CPU和网络开销。客户端发送一个请求,broker等到有足够的数据时才把它们返回给客户端,然后客户端再发出请求,而不是让客户端每隔几毫秒就发送一个请求。
当然,我们不会让客户端一直等待broker 累积数据。在等待了一段时间之后,就可以把可用的数据拿回处理,而不是一直等待下去。所以,客户端可以定义一个超时时间,告
诉broker :“如果你无告在X 毫秒内累积满足要求的数据量,那么就把当前这些数据返回给我。"
有意思的是,并不是所有保存在分区首领上的数据都可以被客户端读取。大部分客户端只能读取已经被写入所有同步副本的悄息(跟随者副本也不行,尽管它们也是消费者否
则复制功能就无陆工作)。分区首领知道每个消息会被复制到哪个副本上,在消息还没有被写入所有同步副本之前,是不会发送给消费者的一一尝试获取这些消息的请求会得到空的响应而不是错误。
因为还没有被足够多副本复制的消息被认为是“不安全”的一一如果首领发生崩愤,另一个副本成为新首领,那么这些消息就丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想, 一个悄费者读取并处理了这样的一个消息,而另一个消费者发现这个消息其实并不存在。所以,我们会等到所有同步副本复制了这些消息,才允许消费者读取它们 。这也意味着,如果broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数replica.lag.time.max.ms来配置,它指定了副本在复制消息时可被允许的最大延迟时间。
kafka的物理存储
暂无