class SocketServer(val brokerId: Int, val host: String, val port: Int, val numProcessorThreads: Int, val maxQueuedRequests: Int, val sendBufferSize: Int, val recvBufferSize: Int, val maxRequestSize: Int = Int.MaxValue, val maxConnectionsPerIp: Int = Int.MaxValue, val connectionsMaxIdleMs: Long, val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup { this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) @volatile private var acceptor: Acceptor = null val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)// /* a meter to track the average free capacity of the network processors */ private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS) /** * Start the socket server */ def startup() { val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) for(i <- 0 until numProcessorThreads) {//启动num.network.threads个Processor线程处理网络请求 processors(i) = new Processor(i, time, maxRequestSize, aggregateIdleMeter, newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)), numProcessorThreads, requestChannel, quotas, connectionsMaxIdleMs) Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() } newGauge("ResponsesBeingSent", new Gauge[Int] { def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) } }) // register the processor threads for notification of responses 注册response的listener,当有response的时候,调用ResponseListener requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) // start accepting connections 接受网络连接请求 this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) Utils.newThread("kafka-socket-acceptor", acceptor, false).start() acceptor.awaitStartup info("Started") } }
Acceptor作为一个独立的线程存在,当接受到网络连接请求的时候,轮训地甩给其中一个Processor线程处理之后的request
private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val recvBufferSize: Int, connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) { val serverChannel = openServerSocket(host, port) /** * Accept loop that checks for new connection attempts */ def run() { serverChannel.register(selector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = 0 while(isRunning) { val ready = selector.select(500) if(ready > 0) { val keys = selector.selectedKeys() val iter = keys.iterator() while(iter.hasNext && isRunning) { var key: SelectionKey = null try { key = iter.next iter.remove() if(key.isAcceptable) accept(key, processors(currentProcessor))//添加到Processor的newConnections中,以后该processor负责这个Connections的所有request else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread 轮训 currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } debug("Closing server socket and selector.") swallowError(serverChannel.close()) swallowError(selector.close()) shutdownComplete() } }
那么Processor线程是如何处理request的呢?关键在于requestChannel,它作为request和response的传输通道,使得Processor线程只负责接受connection的requet和发送相应的reponse,而和真实的业务逻辑无关,且看requestChannel:
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { private var responseListeners: List[(Int) => Unit] = Nil private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//1个request的阻塞队列,供之后的KafkaRequestHandler线程接收 private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)// num.network.threads个response的阻塞队列,供之后的KafkaRequestHandler线程存放 for(i <- 0 until numProcessors) responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]() }
即Processor线程将各自对应的connection的request都存放进requestQueue中,然后分别从对应的responseQueues(i)中获取对应的request的response,如下图:
代码如下:
private[kafka] class Processor(val id: Int, val time: Time, val maxRequestSize: Int, val aggregateIdleMeter: Meter, val idleMeter: Meter, val totalProcessorThreads: Int, val requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, val connectionsMaxIdleMs: Long) extends AbstractServerThread(connectionQuotas) { override def run() { startupComplete() while(isRunning) { // setup any new connections that have been queued up configureNewConnections() // register any new responses for writing processNewResponses()//receive 对应阻塞队列responseQueue的response val startSelectTime = SystemTime.nanoseconds val ready = selector.select(300) currentTimeNanos = SystemTime.nanoseconds val idleTime = currentTimeNanos - startSelectTime idleMeter.mark(idleTime) // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and // time_window is independent of the number of threads, each recorded idle // time should be discounted by # threads. aggregateIdleMeter.mark(idleTime / totalProcessorThreads) trace("Processor id " + id + " selection time = " + idleTime + " ns") if(ready > 0) { val keys = selector.selectedKeys() val iter = keys.iterator() while(iter.hasNext && isRunning) { var key: SelectionKey = null try { key = iter.next iter.remove() if(key.isReadable) read(key)//获取connection的request else if(key.isWritable) write(key)//写相应request的response else if(!key.isValid) close(key) else throw new IllegalStateException("Unrecognized key state for processor thread.") } catch { case e: EOFException => { info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress)) close(key) } case e: InvalidRequestException => { info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage)) close(key) } case e: Throwable => { error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e) close(key) } } } } maybeCloseOldestConnection } debug("Closing selector.") closeAll() swallowError(selector.close()) shutdownComplete() } def read(key: SelectionKey) { lruConnections.put(key, currentTimeNanos) val socketChannel = channelFor(key) var receive = key.attachment.asInstanceOf[Receive] if(key.attachment == null) { receive = new BoundedByteBufferReceive(maxRequestSize) key.attach(receive) } val read = receive.readFrom(socketChannel) val address = socketChannel.socket.getRemoteSocketAddress(); trace(read + " bytes read from " + address) if(read < 0) { close(key) } else if(receive.complete) { val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)//组装request requestChannel.sendRequest(req)//把request发送给requestChannel key.attach(null) // explicitly reset interest ops to not READ, no need to wake up the selector just yet key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) } else { // more reading to be done trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) wakeup() } } def write(key: SelectionKey) { val socketChannel = channelFor(key) val response = key.attachment().asInstanceOf[RequestChannel.Response] val responseSend = response.responseSend//获取response的内容 if(responseSend == null) throw new IllegalStateException("Registered for write interest but no response attached to key.") val written = responseSend.writeTo(socketChannel)//将response发送给负责该connection的socket trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key) if(responseSend.complete) { response.request.updateRequestMetrics() key.attach(null) trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) } else { trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_WRITE) wakeup() } } }