Kafka初始化和故障转移

时间:2021-05-12 12:42:11

Kafka启动都会创建KafkaController,然后会向zookeeper注册,第一个注册的节点就是Leader,其余都是follower。当KafkaController出现故障,不能继续管理集群,则那些KafkaController follower开始竞争成为新的Leader

KafkaController的启动过程是在startup方法中完成的:

首先:注册一个SessionExpirationListener会话超时监听器,监听KafkaController和zookeeper的连接状态。连接超时后创建新连接时会触发SessionExpirationListener的handleNewSession方法

然后:启动ZookeeperLeaderElector

defstartup() = {
  inLock(controllerContext.controllerLock) {
    info("Controller starting up")
    // 注册一个会话超时的监听器
   
registerSessionExpirationListener()
    isRunning = true
   
// 调用ZookeeperLeaderElectorstartup方法
   
controllerElector
.startup
    info
("Controller startup complete")
  }
}

 

我们先来分析ZookeeperLeaderElector这个类:

一 核心字段

controllerContext:ControllerContext KafkaController上下问信息

electionPath: String 选举的路径

onBecomingLeader:需要传递一个controller 故障转移函数

onResigningAsLeader:需要传递一个后续做一些清理工作的函数

leaderId:Int 缓存当前controller leader的id

leaderChangeListener:LeaderChangeListener 监听'/controller'节点数据变化,当该节点保存的leaderId发生变化时,会触发LeaderChangeListner进行对应处理

二 重要方法

2.1 startup方法

def startup {
  inLock(controllerContext.controllerLock) {
    // 监听'/controller'节点数据的变化
   
controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
    // 开始选举
   
elect
 
}
}

 

2.2 elect 选举

def elect: Boolean = {
  val timestamp = SystemTime.milliseconds.toString
 
val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
 // 获取zookeeper当前记录的controller leader id
 
leaderId
= getControllerID
 
// 如果已存在,表示该broker已经是leader,返回true,不需要选举
 
if(leaderId != -1) {
     debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
     return amILeader
  }

  try {
    // 创建临时节点,并进行session检查等,如果临时节点已存在抛出异常
   
val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath, electString,
      controllerContext.zkUtils.zkConnection.getZookeeper, JaasUtils.isZkSecurityEnabled())
    zkCheckedEphemeral.create()
    info(brokerId + " successfully elected as leader")
    // 更新leader id字段
   
leaderId
= brokerId
   
// 如果成功,调用故障转移函数
   
onBecomingLeader()
  } catch {
    case e: ZkNodeExistsException =>
      leaderId = getControllerID
     
if (leaderId != -1)
        debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
      else
       
warn("A leader has been elected but just resigned, this will result in another round of election")
    case e2: Throwable =>
      error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
      resign()
  }
  amILeader
}

 

在这里我们知道,它会调用onBecomingLeader也就是KafkaController里的onControllerFailover方法

def onControllerFailover() {
  if(isRunning) {
    info("Broker %d starting become controller state transition".format(config.brokerId))
    // zookeeper读取controller的年代信息,从"/controller_epoch"路径获取
   
readControllerEpochFromZookeeper()
    // 递增ControllerEpoch值并且写入zookeeper
   
incrementControllerEpoch(zkUtils.zkClient)
    // 注册ReassignedPartitionsListener监听器
   
registerReassignedPartitionsListener()
    // 注册IsrChangeNotificationListener
   
registerIsrChangeNotificationListener()
    // 注册优先副本选举监听器
   
registerPreferredReplicaElectionListener()
    // 注册TopicChangeListner和,DeleteChangeListner
   
partitionStateMachine.registerListeners()
    // 注册BrokerChangeListener
   
replicaStateMachine
.registerListeners()
    // 实例化ControllerContext,zookeeper读取topic,partitionreplica等信息
   
initializeControllerContext()
    // 启动replica 状态机,初始化所有的replica状态
   
replicaStateMachine
.startup()
    // 启动Partition状态机,初始化所有的partition状态
   
partitionStateMachine
.startup()
    // 为所有存在的topic注册PartitionModificationListener
   
controllerContext
.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
    info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
    // 处理副本重新分配分区
   
maybeTriggerPartitionReassignment()
    // 处理"优先副本"选举的分区
   
maybeTriggerPreferredReplicaElection()
    // 向集群中其他broker发送 UpdateMetadataRequest请求
   
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
    // 根据配置决定是否开启分区的自定均衡功能
   
if (config.autoLeaderRebalanceEnable) {
      info("starting the partition rebalance scheduler")
      autoRebalanceScheduler.startup()
      autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
        5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
    }
    // 调用TopicDeletionManager,底层启动DeleteTopicThread
   
deleteTopicManager
.start()
  }
  else
   
info("Controller has been shut down, aborting startup/failover")
}

 

接下来分析一下LeaderChangeListener:

当存在zk的leader信息改变,则调用这个函数,并且在内存记录新的leader

def handleDataChange(dataPath: String, data: Object) {
  inLock(controllerContext.controllerLock) {
    val amILeaderBeforeDataChange = amILeader
   
// 记录新的leaderid
   
leaderId
= KafkaController.parseControllerId(data.toString)
    info("New leader is %d".format(leaderId))
    // 旧的leader需要辞职,即从Controller Leader 变为Follower
   
if (amILeaderBeforeDataChange && !amILeader)
      onResigningAsLeader()
  }
}

 

Controller Leader 变为Follower 需要做一些清理工作:

def onControllerResignation() {
  debug("Controller resigning, broker id %d".format(config.brokerId))
  // 撤销zookeeper上的注册的监视器
 
deregisterIsrChangeNotificationListener()
  deregisterReassignedPartitionsListener()
  deregisterPreferredReplicaElectionListener()

  // 关闭TopicDeletionManager
 
if (deleteTopicManager != null)
    deleteTopicManager.shutdown()
  //关闭 leader rebalance定时任务
 
if (config.autoLeaderRebalanceEnable)
    autoRebalanceScheduler.shutdown()
  inLock(controllerContext.controllerLock) {
    // 取消所有的ReassignedPartitionsIsrChangeListner
   
deregisterReassignedPartitionsIsrChangeListeners()
    // 关闭partition&replica状态机
   
partitionStateMachine
.shutdown()
    replicaStateMachine.shutdown()
    // 关闭ControllerChannelManager,断开与其他broker的连接
   
if(controllerContext.controllerChannelManager != null) {
      controllerContext.controllerChannelManager.shutdown()
      controllerContext.controllerChannelManager = null
   
}
    // 重置ControllerContext,切换broker状态
   
controllerContext
.epoch=0
   
controllerContext.epochZkVersion=0
   
brokerState.newState(RunningAsBroker)

    info("Broker %d resigned as the controller".format(config.brokerId))
  }
}

 

当"/controller"节点中的数据被删除时会触发handleDataDeleted方法进行处理

def handleDataDeleted(dataPath: String) {
  inLock(controllerContext.controllerLock) {
    debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
     
.format(brokerId, dataPath))
    if(amILeader)
      onResigningAsLeader()
    elect // 重新选举
 
}
}