kafka源码之kafkaserver的启动

时间:2023-01-03 00:01:34

KAFKA的启动

Kafka启动时,通过进入kafkabin路径下,执行如下脚本:

./kafka-server-start.sh ../config/server.properties

 

这个脚本会启动Kafka类的实例,并执行main函数,传入的参数是server.properties的路径.

def main(args: Array[String]): Unit = {
  try {

加载对应的server.properties配置文件,并生成Properties实例.
    val serverProps = getPropsFromArgs(args)

这里生成一个KafkaServer的实例,这个实例生成时,会在实例中同时生成一个KafkaServer的实例,

生成KafkaServer实例前,需要先通过serverProps生成出一个KafkaConfig的实例.
    val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

添加对kill操作的勾子函数.用于处理,如果直接kill时关闭kafkaserver.
    // attach shutdown handler to catch control-c
    
Runtime.getRuntime().addShutdownHook(new Thread() {
      override def run() = {
        kafkaServerStartable.shutdown
      }
    })
启动并等待server停止.
    kafkaServerStartable.startup
    kafkaServerStartable.awaitShutdown
  }
  catch {
    case e: Throwable =>
      fatal(e)
      System.exit(1)
  }
  System.exit(0)
}

 

根据properties生成server实例

KafkaServerStartable.fromProps(serverProps)函数调用时,也就是kakfa启动时,

new KafkaServerStartable(KafkaConfig.fromProps(serverProps))

KafkaServerStartable实例生成时,会生成KafkaServer实例:

  class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
         private val server new KafkaServer(serverConfig)

 

KafkaConfig.fromProps(serverProps)的函数调用流程:

def fromProps(props: Properties): KafkaConfig =
  fromProps(propstrue)

def fromProps(props: PropertiesdoLog: Boolean): KafkaConfig =
  new KafkaConfig(propsdoLog)

 

 

KafkaServer实例用于对所有的组件进行统一的初始化与启动.

KafkaServer的启动

Kafkamain函数中执行startup,会调用KafkaServer实例中的startup函数.

KafkaServer的实例生成时,会在每个logDir的目录下生成一个meta.properties配置文件,

这个文件中主要记录有这个kafka的版本与broker.id的值.

KafkaServer的实例启动时,会生成kafka对外服务的socket server与相关组件,并对其进行启动.

 

在执行startup函数时,下面分析下这个函数的具体的执行流程:

1,设置brokerState的状态为Starting的状态.

brokerState.newState(Starting)

 

2,启动kafka的调度器,这个KafkaScheduler的实例生成时需要得到background.threads配置的值,默认是10个,用于配置后台线程池的个数.

/* start scheduler */
kafkaScheduler
.startup()

 

3,初始化与zookeeper的连接,

这里需要的配置项:

配置项zookeeper.connect,默认值localhost:2181.用于设置kafka连接的zookeeper的连接地址.

配置项zookeeper.session.timeout.ms,默认值6000ms,用于控制zk的session的超时时间,可设置为同步时间的2倍或3倍.

配置项zookeeper.connection.timeout.ms,默认值6000ms,用于配置连接zk的连接超时时间.

配置项zookeeper.sync.time.ms,默认值2000ms,用于与zk进行同步的时间间隔,

配置项zookeeper.set.acl,是否启用zookeeper的acl控制,默认值为false,表示不启用.

 

这里得到的zkUtils实例是一个ZkUtils的实例,在实例生成后,会判断zk中是否存在如下地址,如果不存在,会创建对应的路径在zk上.

路径/consumers,这个路径用于消费者的client.id存储对应消费的offset的路径.

路径/brokers/ids,这个路径用于存储所有的broker id的路径.

路径/brokers/topics,用于存储每个broker对应的topics的信息,

路径/config/changes,还不知道,后期用到在说.

路径/config/topics,还不知道,后期用到在说.

路径/config/clients,还不知道,后期用到在说.

路径/admin/delete_topics,用于存储删除的topic的信息.

路径/brokers/seqid,还不知道,后期用到在说.

路径/isr_change_notification,这个用于在kafka的副本broker发生变化时用于通知的存储路径.

/* setup zookeeper */
zkUtils 
= initZk()

 

4,初始化创建并启动LogManager的实例,

/* start log manager */
logManager 
= createLogManager(zkUtils.zkClientbrokerState)
logManager.startup()

 

5,得到当前配置文件中的brokerId的信息.

如果broker.id的配置没有配置(小于0的值时),同时broker.id.generation.enable配置为true,默认也就是true,这个时候根据zk中/brokers/seqid路径的version值,第一次从0开始,每次增加.并加上reserved.broker.max.id配置的值,默认是1000,来充当这个server的broker.id,同时把这个broker.id更新到logDir目录下的meta.properties文件中,下次读取时,直接读取这个配置文件中的broker.id的值,而不需要重新进行创建.

/* generate brokerId */
config.brokerId =  getBrokerId
this.logIdent "[Kafka Server " + config.brokerId "], "

 

6,生成并启动kafka的SocketServer.

socketServer new SocketServer(configmetricskafkaMetricsTime)
socketServer.startup()

 

7,生成并启动ReplicaManager,此实例依赖kafkaScheduler与logManager实例.

/* start replica manager */
replicaManager 
new ReplicaManager(configmetricstimekafkaMetricsTime

  zkUtilskafkaSchedulerlogManager,
  isShuttingDown)
replicaManager.startup()

 

8,生成并启动KafkaController实例,此使用用于控制当前的broker中的所有的leader的partition的操作.

/* start kafka controller */
kafkaController 
new KafkaController(configzkUtilsbrokerState,

    kafkaMetricsTimemetricsthreadNamePrefix)
kafkaController.startup()

 

9,生成并启动GroupCoordinator的实例,这个是0.9新加入的一个玩意,用于对consumer中新加入的与partition的检查,并对partition与consumer进行平衡操作.

/* start kafka coordinator */
consumerCoordinator 
= GroupCoordinator.create(configzkUtilsreplicaManager)
consumerCoordinator.startup()

 

10,根据authorizer.class.name配置项配置的Authorizer的实现类,生成一个用于认证的实例,用于对用户的操作进行认证.这个默认为不认证.

/* Get the authorizer and initialize it if one is specified.*/
authorizer 
Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
  val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
  authZ.configure(config.originals())
  authZ
}

 

11,生成用于对外对外提供服务的KafkaApis实例,并设置当前的broker的状态为运行状态.

/* start processing requests */
apis 
new KafkaApis(socketServer.requestChannelreplicaManager

   consumerCoordinator,
   kafkaControllerzkUtilsconfig.brokerIdconfigmetadataCachemetrics,

   authorizer)
requestHandlerPool new KafkaRequestHandlerPool(config.brokerIdsocketServer.requestChannelapisconfig.numIoThreads)
brokerState.newState(RunningAsBroker)

 

12,生成动态配置修改的处理管理,主要是topic修改与client端配置的修改,并把已经存在的clientid对应的配置进行修改.

/* start dynamic config manager */
dynamicConfigHandlers 
Map[StringConfigHandler](

ConfigType.Topic -> new TopicConfigHandler(logManager),
    ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers)

)
// 
TODO: Move this logic to DynamicConfigManager
AdminUtils.fetchAllEntityConfigs(zkUtilsConfigType.Client).foreach {
  case (clientIdproperties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientIdproperties)
}
// Create the config manager. start listening to notifications
dynamicConfigManager 
new DynamicConfigManager(zkUtilsdynamicConfigHandlers)
dynamicConfigManager.startup()

 

13,生成kafka的心跳检查处理工具,这里需要使用到listeners的配置,根据是否在IAAS的环境下,需要使用到advertised相关配置,

如果advertised.listeners配置项存在,直接使用配置的listener,

否则,如果advertised.host.name配置项或者advertised.port配置项存在,使用这两个配置项,并使用明文传输(PLAINTEXT://host:port),如果advertised.port没有配置,直接使用port的配置,host可以没有设置

最后,如果上面的不都满足,直接使用listeners的配置.默认是PLAINTEXT://:port

/* tell everyone we are alive */
val listeners = config.advertisedListeners.map {case(protocolendpoint) =>
  if (endpoint.port == 0)
    (protocolEndPoint(endpoint.hostsocketServer.boundPort(protocol)endpoint.protocolType))
  else
    
(protocolendpoint)
}
kafkaHealthcheck new KafkaHealthcheck(config.brokerIdlistenerszkUtils)
kafkaHealthcheck.startup()