kafka源码解析之七KafkaRequestHandlerPool

时间:2022-04-04 23:57:43
KafkaRequestHandlerPool的逻辑比较简单,就是开启num.io.threadsKafkaRequestHandler,每个KafkaRequestHandlerRequestChannel
. requestQueue 接受request,然后把对应的response存进responseQueues(i)队列
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) extends Logging with KafkaMetricsGroup {

/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {//创建num.io.threads个KafkaRequestHandler
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}

def shutdown() {
info("shutting down")
for(handler <- runnables)
handler.shutdown
for(thread <- threads)
thread.join
info("shut down completely")
}
}

class KafkaRequestHandler(id: Int,                          brokerId: Int,                          val aggregateIdleMeter: Meter,                          val totalHandlerThreads: Int,                          val requestChannel: RequestChannel,                          apis: KafkaApis) extends Runnable with Logging {  this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "  def run() {    while(true) {      try {        var req : RequestChannel.Request = null        while (req == null) {          // We use a single meter for aggregate idle percentage for the thread pool.          // Since meter is calculated as total_recorded_value / time_window and          // time_window is independent of the number of threads, each recorded idle          // time should be discounted by # threads.          val startSelectTime = SystemTime.nanoseconds          req = requestChannel.receiveRequest(300)//从RequestChannel.requestQueue获取request          val idleTime = SystemTime.nanoseconds - startSelectTime          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)        }        if(req eq RequestChannel.AllDone) {          debug("Kafka request handler %d on broker %d received shut down command".format(            id, brokerId))          return        }        req.requestDequeueTimeMs = SystemTime.milliseconds        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))        apis.handle(req)//调用负责业务逻辑的KafkaApis进行真正的处理,然后把response存放进对应的RequestChannel. responseQueues[i]      } catch {        case e: Throwable => error("Exception when handling request", e)      }    }  }  def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)}