Kafka启动都会创建KafkaController,然后会向zookeeper注册,第一个注册的节点就是Leader,其余都是follower。当KafkaController出现故障,不能继续管理集群,则那些KafkaController follower开始竞争成为新的Leader
KafkaController的启动过程是在startup方法中完成的:
首先:注册一个SessionExpirationListener会话超时监听器,监听KafkaController和zookeeper的连接状态。连接超时后创建新连接时会触发SessionExpirationListener的handleNewSession方法
然后:启动ZookeeperLeaderElector
defstartup() = {
inLock(controllerContext.controllerLock) {
info("Controller starting up")
// 注册一个会话超时的监听器
registerSessionExpirationListener()
isRunning = true
// 调用ZookeeperLeaderElector的startup方法
controllerElector.startup
info("Controller startup complete")
}
}
我们先来分析ZookeeperLeaderElector这个类:
一 核心字段
controllerContext:ControllerContext KafkaController上下问信息
electionPath: String 选举的路径
onBecomingLeader:需要传递一个controller 故障转移函数
onResigningAsLeader:需要传递一个后续做一些清理工作的函数
leaderId:Int 缓存当前controller leader的id
leaderChangeListener:LeaderChangeListener 监听'/controller'节点数据变化,当该节点保存的leaderId发生变化时,会触发LeaderChangeListner进行对应处理
二 重要方法
2.1 startup方法
def startup {
inLock(controllerContext.controllerLock) {
// 监听'/controller'节点数据的变化
controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
// 开始选举
elect
}
}
2.2 elect 选举
def elect: Boolean = {
val timestamp = SystemTime.milliseconds.toString
val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
// 获取zookeeper当前记录的controller leader id
leaderId = getControllerID
// 如果已存在,表示该broker已经是leader,返回true,不需要选举
if(leaderId != -1) {
debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
return amILeader
}
try {
// 创建临时节点,并进行session检查等,如果临时节点已存在抛出异常
val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath, electString,
controllerContext.zkUtils.zkConnection.getZookeeper, JaasUtils.isZkSecurityEnabled())
zkCheckedEphemeral.create()
info(brokerId + " successfully elected as leader")
// 更新leader id字段
leaderId = brokerId
// 如果成功,调用故障转移函数
onBecomingLeader()
} catch {
case e: ZkNodeExistsException =>
leaderId = getControllerID
if (leaderId != -1)
debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
else
warn("A leader has been elected but just resigned, this will result in another round of election")
case e2: Throwable =>
error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
resign()
}
amILeader
}
在这里我们知道,它会调用onBecomingLeader也就是KafkaController里的onControllerFailover方法
def onControllerFailover() {
if(isRunning) {
info("Broker %d starting become controller state transition".format(config.brokerId))
// 从zookeeper读取controller的年代信息,从"/controller_epoch"路径获取
readControllerEpochFromZookeeper()
// 递增ControllerEpoch值并且写入zookeeper
incrementControllerEpoch(zkUtils.zkClient)
// 注册ReassignedPartitionsListener监听器
registerReassignedPartitionsListener()
// 注册IsrChangeNotificationListener
registerIsrChangeNotificationListener()
// 注册优先副本选举监听器
registerPreferredReplicaElectionListener()
// 注册TopicChangeListner和,DeleteChangeListner
partitionStateMachine.registerListeners()
// 注册BrokerChangeListener
replicaStateMachine.registerListeners()
// 实例化ControllerContext,从zookeeper读取topic,partition,replica等信息
initializeControllerContext()
// 启动replica 状态机,初始化所有的replica状态
replicaStateMachine.startup()
// 启动Partition状态机,初始化所有的partition状态
partitionStateMachine.startup()
// 为所有存在的topic注册PartitionModificationListener
controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
// 处理副本重新分配分区
maybeTriggerPartitionReassignment()
// 处理"优先副本"选举的分区
maybeTriggerPreferredReplicaElection()
// 向集群中其他broker发送 UpdateMetadataRequest请求
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
// 根据配置决定是否开启分区的自定均衡功能
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
// 调用TopicDeletionManager,底层启动DeleteTopicThread
deleteTopicManager.start()
}
else
info("Controller has been shut down, aborting startup/failover")
}
接下来分析一下LeaderChangeListener:
当存在zk的leader信息改变,则调用这个函数,并且在内存记录新的leader
def handleDataChange(dataPath: String, data: Object) {
inLock(controllerContext.controllerLock) {
val amILeaderBeforeDataChange = amILeader
// 记录新的leader的id
leaderId = KafkaController.parseControllerId(data.toString)
info("New leader is %d".format(leaderId))
// 旧的leader需要辞职,即从Controller Leader 变为Follower
if (amILeaderBeforeDataChange && !amILeader)
onResigningAsLeader()
}
}
Controller Leader 变为Follower 需要做一些清理工作:
def onControllerResignation() {
debug("Controller resigning, broker id %d".format(config.brokerId))
// 撤销zookeeper上的注册的监视器
deregisterIsrChangeNotificationListener()
deregisterReassignedPartitionsListener()
deregisterPreferredReplicaElectionListener()
// 关闭TopicDeletionManager
if (deleteTopicManager != null)
deleteTopicManager.shutdown()
//关闭 leader rebalance定时任务
if (config.autoLeaderRebalanceEnable)
autoRebalanceScheduler.shutdown()
inLock(controllerContext.controllerLock) {
// 取消所有的ReassignedPartitionsIsrChangeListner
deregisterReassignedPartitionsIsrChangeListeners()
// 关闭partition&replica状态机
partitionStateMachine.shutdown()
replicaStateMachine.shutdown()
// 关闭ControllerChannelManager,断开与其他broker的连接
if(controllerContext.controllerChannelManager != null) {
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager = null
}
// 重置ControllerContext,切换broker状态
controllerContext.epoch=0
controllerContext.epochZkVersion=0
brokerState.newState(RunningAsBroker)
info("Broker %d resigned as the controller".format(config.brokerId))
}
}
当"/controller"节点中的数据被删除时会触发handleDataDeleted方法进行处理
def handleDataDeleted(dataPath: String) {
inLock(controllerContext.controllerLock) {
debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
.format(brokerId, dataPath))
if(amILeader)
onResigningAsLeader()
elect // 重新选举
}
}