这里用kakfa 3.5版本做源码演示
- 1、启动命令
- 2、服务端选择一种server初始化
- (1) zk初始化
- (2) raft初始化
1、启动命令
首先看一下 中的最后一行
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
我们知道了执行的是core/src/main/scala/kafka/
下的main方法
def main(args: Array[String]): Unit = {
try {
//获得配置文件中的参数
val serverProps = getPropsFromArgs(args)
//build server,非常重要,因为你要确定到底执行的是哪种server
val server = buildServer(serverProps)
//省略干扰代码。。。。
//执行server的 startup
try server.startup()
//省略干扰代码。。。。
}
//生成server
private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, false)
if (config.requiresZookeeper) { //是否在properties中有=broker,controller配置
//没有则构造KafkaServer
new KafkaServer(
config,
Time.SYSTEM,
threadNamePrefix = None,
enableForwarding = false
)
} else {
//存在则构造KafkaRaftServer
new KafkaRaftServer(
config,
Time.SYSTEM,
threadNamePrefix = None
)
}
}
其中 就是判断
/config/kraft/
中是否有下面的配置
参考Kafka2.8无Zookeeper模式下集群部署
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@master:9093,2@slave1:9093,3@slave2:9093
2、服务端选择一种server初始化
每一种server都要实现下面这三个接口
trait Server {
def startup(): Unit
def shutdown(): Unit
def awaitShutdown(): Unit
}
(1) zk初始化
初始化调用的是 这个类中的
startup
方法
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
* 启动用于启动 Kafka 服务器的单个实例的 API。实例化 LogManager、SocketServer 和请求处理程序 - KafkaRequestHandlers
*/
override def startup(): Unit = {
//省略代码。。。。。
//初始化逻辑
}
(2) raft初始化
初始化调用的是
//raft启动函数
override def startup(): Unit = {
Mx4jLoader.maybeLoad()
//这行代码使用foreach方法对controller进行迭代,如果controller不为None,则调用其startup方法。这里使用了占位符语法_.startup(),表示对每个元素执行startup方法。
//ControllerServer 对象,当节点的配置 中指定了 controller 角色时才会创建,处理元数据类请求,包括 topic 创建删除等
controller.foreach(_.startup())
//(_.startup()):与前一行代码类似,这行代码对broker进行迭代,并调用其startup方法。
//BrokerServer 对象,当节点的配置 中指定了 broker 角色时才会创建,处理消息数据类请求,例如消息的生产消费等
broker.foreach(_.startup())
AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
info(KafkaBroker.STARTED_MESSAGE)
}
(_.startup())
遍历调用的是类中的startup方法,主要是初始化控制器
(_.startup())
遍历调用的是类中的startup方法,主要是初始化每一个broker