KafkaRequestHandlerPool的逻辑比较简单,就是开启num.io.threads个KafkaRequestHandler,每个KafkaRequestHandler从RequestChannel
. requestQueue 接受request,然后把对应的response存进responseQueues(i)队列
class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, numThreads: Int) extends Logging with KafkaMetricsGroup { /* a meter to track the average free capacity of the request handlers */ private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS) this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], " val threads = new Array[Thread](numThreads) val runnables = new Array[KafkaRequestHandler](numThreads) for(i <- 0 until numThreads) {//创建num.io.threads个KafkaRequestHandler runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis) threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i)) threads(i).start() } def shutdown() { info("shutting down") for(handler <- runnables) handler.shutdown for(thread <- threads) thread.join info("shut down completely") } }
class KafkaRequestHandler(id: Int, brokerId: Int, val aggregateIdleMeter: Meter, val totalHandlerThreads: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging { this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], " def run() { while(true) { try { var req : RequestChannel.Request = null while (req == null) { // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and // time_window is independent of the number of threads, each recorded idle // time should be discounted by # threads. val startSelectTime = SystemTime.nanoseconds req = requestChannel.receiveRequest(300)//从RequestChannel.requestQueue获取request val idleTime = SystemTime.nanoseconds - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads) } if(req eq RequestChannel.AllDone) { debug("Kafka request handler %d on broker %d received shut down command".format( id, brokerId)) return } req.requestDequeueTimeMs = SystemTime.milliseconds trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req)) apis.handle(req)//调用负责业务逻辑的KafkaApis进行真正的处理,然后把response存放进对应的RequestChannel. responseQueues[i] } catch { case e: Throwable => error("Exception when handling request", e) } } } def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone) }