apache kafka系列之源码分析走读-kafka内部模块分析

时间:2022-03-15 07:17:32

apache kafka中国社区QQ群:162272557

kafka整体结构分析:

kafka源代码工程目录结构如下图:

apache kafka系列之源码分析走读-kafka内部模块分析

下面只对core目录结构作说明,其他都是测试类或java客户端代码

 

admin   --管理员模块,操作和管理topic,paritions相关,包含create,delete topic,扩展 patitions

Api       --该模块主要负责交互数据的组装,客户端与服务端交互数据编解码

client    --该模块比较简单,就一个类,Producer读取kafka broker元数据信息,

topic和partitions,以及leader

cluster   --该模块包含几个实体类,Broker,Cluster,Partition,Replica,解释他们之间关系: Cluster由多个broker组成,一个Broker包含多个partition,一个topic的所有

partitions分布在不同broker的中,一个Replica包含多个Partition。

common     --通用模块,只包含异常类和错误验证

consumer    --consumer处理模块,负责所有客户端消费者数据和逻辑处理

contoroller  --负责*控制器选举,partition的leader选举,副本分配,副本重新分配,

partition和replica扩容。

javaapi   --提供java的producer和consumer接口api

log          --Kafka文件存储模块,负责读写所有kafka的topic消息数据。

message    --封装多个消息组成一个“消息集”或压缩消息集。

metrics    --内部状态的监控模块

network        --网络事件处理模块,负责处理和接收客户端连接

producer        --producer实现模块,包括同步和异步发送消息。

serializer        --序列化或反序列化当前消息

kafka         --kafka门面入口类,副本管理,topic配置管理,leader选举实现(由contoroller模块调用)。

tools           --一看这就是工具模块,包含内容比较多:

a.导出对应consumer的offset值.

b.导出LogSegments信息,当前topic的log写的位置信息.

c.导出zk上所有consumer的offset值.

d.修改注册在zk的consumer的offset值.

f.producer和consumer的使用例子.

utils   --Json工具类,Zkutils工具类,Utils创建线程工具类,KafkaScheduler公共调度器类,公共日志类等等。



1.kafka启动类:kafka.scala

kafka为kafka broker的main启动类,其主要作用为加载配置,启动report服务(内部状态的监控),注册释放资源的钩子,以及门面入口类。

kafka类代码如下:

......

 try {
      val props = Utils.loadProps(args(0))          //加载配置文件

      val serverConfig = new KafkaConfig(props)
      KafkaMetricsReporter.startReporters(serverConfig.props)             //启动report服务(内部状态的监控)
      val kafkaServerStartble = new KafkaServerStartable(serverConfig)    //kafka server核心入口类
      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread() {                 //钩子程序,当jvm退出前,销毁所有资源
        override def run() = {
          kafkaServerStartble.shutdown
        }
      })


      kafkaServerStartble.startup
      kafkaServerStartble.awaitShutdown
    }

......

KafkaServerStartble类包装了KafkaSever类,其实啥都没有做。只是调用包装类而已

KafkaSever类是kafka broker运行控制的核心入口类,它是采用门面模式设计的。

apache kafka系列之源码分析走读-kafka内部模块分析


kafka中KafkaServer类,采用门面模式,是网络处理,io处理等得入口.

ReplicaManager   副本管理

KafkaApis    处理所有request的Proxy类,根据requestKey决定调⽤用具体的handler

KafkaRequestHandlerPool 处理request的线程池,请求处理池  <-- num.io.threads io线程数量

LogManager    kafka文件存储系统管理,负责处理和存储所有Kafka的topic的partiton数据

TopicConfigManager 监听此zk节点的⼦子节点/config/changes/,通过LogManager更新topic的配置信息,topic粒度配置管理,具体请查看topic级别配置

KafkaHealthcheck 监听zk session expire,在zk上创建broker信息,便于其他broker和consumer获取其信息

KafkaController kafka集群*控制器选举,leader选举,副本分配。

KafkaScheduler 负责副本管理和日志管理调度等等

ZkClient         负责注册zk相关信息.

BrokerTopicStats topic信息统计和监控

ControllerStats          *控制器统计和监控


KafkaServer部分主要代码如下:

......  
def startup() {
info("starting")
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)

/* start scheduler */
kafkaScheduler.startup()

/* setup zookeeper */
zkClient = initZk()

/* start log manager */
logManager = createLogManager(zkClient)
logManager.startup()

socketServer = new SocketServer(config.brokerId,
config.hostName,
config.port,
config.numNetworkThreads,
config.queuedMaxRequests,
config.socketSendBufferBytes,
config.socketReceiveBufferBytes,
config.socketRequestMaxBytes)
socketServer.startup()

replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
kafkaController = new KafkaController(config, zkClient)

/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)

Mx4jLoader.maybeLoad()

replicaManager.startup()

kafkaController.startup()

topicConfigManager = new TopicConfigManager(zkClient, logManager)
topicConfigManager.startup()

/* tell everyone we are alive */
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
kafkaHealthcheck.startup()


registerStats()
startupComplete.set(true);
info("started")
}

private def initZk(): ZkClient = {
info("Connecting to zookeeper on " + config.zkConnect)
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.setupCommonPaths(zkClient)
zkClient
}

/**
* Forces some dynamic jmx beans to be registered on server startup.
*/
private def registerStats() {
BrokerTopicStats.getBrokerAllTopicsStats()
ControllerStats.uncleanLeaderElectionRate
ControllerStats.leaderElectionTimer
}
.......