apache kafka系列之源码分析走读-kafkaApi详解

时间:2020-11-29 16:46:11

Kafka源码中数据交互流程

apache kafka系列之源码分析走读-kafkaApi详解

图1

1.概述

kafka启动时做很多初始化运行环境工作,具体请参考:apache kafka系列之源码分析走读-kafka内部模块分析

其中SockeServer类启动时,首先初始化NIO网络环境、启动监听、创建主线程、工作线程池、设置参数等等。

从上图1中可以看到整个交互过程中,kafka的所有逻辑处理和交互实际是交给KafkaApi类来处理的。

通过请求的类型,把具体的request路由到对应的handler处理。目前kafka并没有把handler抽象出来,

而是每个handler都是一个函数,混在KafkaApi类中。

2. Request请求类别

kafka-0.8.1版本中定义了10种类型请求,请求类型说明如下:

参数

说明(解释)

请求二进制数据解码类

RequestKeys.ProduceKey

producer请求

ProducerRequest

RequestKeys.FetchKey

consumer请求

FetchRequest

RequestKeys.OffsetsKey

topicoffset请求

OffsetRequest

RequestKeys.MetadataKey

topic元数据请求

TopicMetadataRequest

RequestKeys.LeaderAndIsrKey

leaderisr信息更新请求

LeaderAndIsrRequest

RequestKeys.StopReplicaKey

停止replica请求

StopReplicaRequest

RequestKeys.UpdateMetadataKey

更新元数据请求

UpdateMetadataRequest

RequestKeys.ControlledShutdownKey

controlledShutdown请求

ControlledShutdownRequest

RequestKeys.OffsetCommitKey

commitOffset请求

OffsetCommitRequest

RequestKeys.OffsetFetchKey

consumeroffset请求

OffsetFetchRequest


下面是KafkaApi中handle方法代码:

  def handle(request: RequestChannel.Request) {
    try{
      trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
      request.requestId match {
        case RequestKeys.ProduceKey => handleProducerRequest(request)  // producer
        case RequestKeys.FetchKey => handleFetchRequest(request)       // consumer
        case RequestKeys.OffsetsKey => handleOffsetRequest(request)
        case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
        case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //成为leader或follower设置同步副本组信息
        case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
        case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
        case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)  //shutdown broker
        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
        case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
        case requestId => throw new KafkaException("Unknown api code " + requestId)
      }
    } catch {
      case e: Throwable =>
        request.requestObj.handleError(e, requestChannel, request)
        error("error when handling request %s".format(request.requestObj), e)
    } finally
      request.apiLocalCompleteTimeMs = SystemTime.milliseconds
  }

3.请求交互二进制数据格式

kafka中客户端与server端交互有多种类型,那它是怎么交互数据呢,格式是怎样?下面来揭开面纱。
请求交互二进制数据 组成为:请求类型 + 请求数据。
apache kafka系列之源码分析走读-kafkaApi详解

3.1 ProducerRequest二进制格式

apache kafka系列之源码分析走读-kafkaApi详解

3.2 FetchRequest二进制格式

apache kafka系列之源码分析走读-kafkaApi详解
3.3 OffsetRequest二进制格式

apache kafka系列之源码分析走读-kafkaApi详解
3.4 TopicMetadataRequest二进制格式

apache kafka系列之源码分析走读-kafkaApi详解
3.5 LeaderAndIsrRequest二进制格式

apache kafka系列之源码分析走读-kafkaApi详解
3.6 StopReplicaRequest二进制格式

apache kafka系列之源码分析走读-kafkaApi详解
3.7 UpdateMetadataRequest二进制格式

apache kafka系列之源码分析走读-kafkaApi详解
3.8 ControlledShutdownRequest二进制格式

apache kafka系列之源码分析走读-kafkaApi详解
3.9 OffsetCommitRequest二进制格式

apache kafka系列之源码分析走读-kafkaApi详解
3.10 OffsetFetchRequest二进制格式

apache kafka系列之源码分析走读-kafkaApi详解