KAFKA的启动
Kafka启动时,通过进入kafka的bin路径下,执行如下脚本:
./kafka-server-start.sh ../config/server.properties
这个脚本会启动Kafka类的实例,并执行main函数,传入的参数是server.properties的路径.
def main(args: Array[String]): Unit = {
try {
加载对应的server.properties配置文件,并生成Properties实例.
val serverProps = getPropsFromArgs(args)
这里生成一个KafkaServer的实例,这个实例生成时,会在实例中同时生成一个KafkaServer的实例,
生成KafkaServer实例前,需要先通过serverProps生成出一个KafkaConfig的实例.
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
添加对kill操作的勾子函数.用于处理,如果直接kill时关闭kafkaserver.
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
kafkaServerStartable.shutdown
}
})
启动并等待server停止.
kafkaServerStartable.startup
kafkaServerStartable.awaitShutdown
}
catch {
case e: Throwable =>
fatal(e)
System.exit(1)
}
System.exit(0)
}
根据properties生成server实例
在KafkaServerStartable.fromProps(serverProps)函数调用时,也就是kakfa启动时,
new KafkaServerStartable(KafkaConfig.fromProps(serverProps))
KafkaServerStartable实例生成时,会生成KafkaServer实例:
class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
private val server = new KafkaServer(serverConfig)
KafkaConfig.fromProps(serverProps)的函数调用流程:
def fromProps(props: Properties): KafkaConfig =
fromProps(props, true)
def fromProps(props: Properties, doLog: Boolean): KafkaConfig =
new KafkaConfig(props, doLog)
KafkaServer实例用于对所有的组件进行统一的初始化与启动.
KafkaServer的启动
在Kafka的main函数中执行startup时,会调用KafkaServer实例中的startup函数.
在KafkaServer的实例生成时,会在每个logDir的目录下生成一个meta.properties配置文件,
这个文件中主要记录有这个kafka的版本与broker.id的值.
KafkaServer的实例启动时,会生成kafka对外服务的socket server与相关组件,并对其进行启动.
在执行startup函数时,下面分析下这个函数的具体的执行流程:
1,设置brokerState的状态为Starting的状态.
brokerState.newState(Starting)
2,启动kafka的调度器,这个KafkaScheduler的实例生成时需要得到background.threads配置的值,默认是10个,用于配置后台线程池的个数.
/* start scheduler */
kafkaScheduler.startup()
3,初始化与zookeeper的连接,
这里需要的配置项:
配置项zookeeper.connect,默认值localhost:2181.用于设置kafka连接的zookeeper的连接地址.
配置项zookeeper.session.timeout.ms,默认值6000ms,用于控制zk的session的超时时间,可设置为同步时间的2倍或3倍.
配置项zookeeper.connection.timeout.ms,默认值6000ms,用于配置连接zk的连接超时时间.
配置项zookeeper.sync.time.ms,默认值2000ms,用于与zk进行同步的时间间隔,
配置项zookeeper.set.acl,是否启用zookeeper的acl控制,默认值为false,表示不启用.
这里得到的zkUtils实例是一个ZkUtils的实例,在实例生成后,会判断zk中是否存在如下地址,如果不存在,会创建对应的路径在zk上.
路径/consumers,这个路径用于消费者的client.id存储对应消费的offset的路径.
路径/brokers/ids,这个路径用于存储所有的broker id的路径.
路径/brokers/topics,用于存储每个broker对应的topics的信息,
路径/config/changes,还不知道,后期用到在说.
路径/config/topics,还不知道,后期用到在说.
路径/config/clients,还不知道,后期用到在说.
路径/admin/delete_topics,用于存储删除的topic的信息.
路径/brokers/seqid,还不知道,后期用到在说.
路径/isr_change_notification,这个用于在kafka的副本broker发生变化时用于通知的存储路径.
/* setup zookeeper */
zkUtils = initZk()
4,初始化创建并启动LogManager的实例,
/* start log manager */
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup()
5,得到当前配置文件中的brokerId的信息.
如果broker.id的配置没有配置(小于0的值时),同时broker.id.generation.enable配置为true,默认也就是true,这个时候根据zk中/brokers/seqid路径的version值,第一次从0开始,每次增加.并加上reserved.broker.max.id配置的值,默认是1000,来充当这个server的broker.id,同时把这个broker.id更新到logDir目录下的meta.properties文件中,下次读取时,直接读取这个配置文件中的broker.id的值,而不需要重新进行创建.
/* generate brokerId */
config.brokerId = getBrokerId
this.logIdent = "[Kafka Server " + config.brokerId + "], "
6,生成并启动kafka的SocketServer.
socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()
7,生成并启动ReplicaManager,此实例依赖kafkaScheduler与logManager实例.
/* start replica manager */
replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime,
zkUtils, kafkaScheduler, logManager,
isShuttingDown)
replicaManager.startup()
8,生成并启动KafkaController实例,此使用用于控制当前的broker中的所有的leader的partition的操作.
/* start kafka controller */
kafkaController = new KafkaController(config, zkUtils, brokerState,
kafkaMetricsTime, metrics, threadNamePrefix)
kafkaController.startup()
9,生成并启动GroupCoordinator的实例,这个是0.9新加入的一个玩意,用于对consumer中新加入的与partition的检查,并对partition与consumer进行平衡操作.
/* start kafka coordinator */
consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager)
consumerCoordinator.startup()
10,根据authorizer.class.name配置项配置的Authorizer的实现类,生成一个用于认证的实例,用于对用户的操作进行认证.这个默认为不认证.
/* Get the authorizer and initialize it if one is specified.*/
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
11,生成用于对外对外提供服务的KafkaApis实例,并设置当前的broker的状态为运行状态.
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager,
consumerCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics,
authorizer)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
12,生成动态配置修改的处理管理,主要是topic修改与client端配置的修改,并把已经存在的clientid对应的配置进行修改.
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](
ConfigType.Topic -> new TopicConfigHandler(logManager),
ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers)
)
// TODO: Move this logic to DynamicConfigManager
AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
}
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
dynamicConfigManager.startup()
13,生成kafka的心跳检查处理工具,这里需要使用到listeners的配置,根据是否在IAAS的环境下,需要使用到advertised相关配置,
如果advertised.listeners配置项存在,直接使用配置的listener,
否则,如果advertised.host.name配置项或者advertised.port配置项存在,使用这两个配置项,并使用明文传输(PLAINTEXT://host:port),如果advertised.port没有配置,直接使用port的配置,host可以没有设置
最后,如果上面的不都满足,直接使用listeners的配置.默认是PLAINTEXT://:port
/* tell everyone we are alive */
val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
if (endpoint.port == 0)
(protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
else
(protocol, endpoint)
}
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils)
kafkaHealthcheck.startup()