kafka源码解析之六SocketServer

时间:2022-01-12 23:57:54
class SocketServer(val brokerId: Int,
                   val host: String,
                   val port: Int,
                   val numProcessorThreads: Int,
                   val maxQueuedRequests: Int,
                   val sendBufferSize: Int,
                   val recvBufferSize: Int,
                   val maxRequestSize: Int = Int.MaxValue,
                   val maxConnectionsPerIp: Int = Int.MaxValue,
                   val connectionsMaxIdleMs: Long,
                   val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Socket Server on Broker " + brokerId + "], "
private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads)
@volatile private var acceptor: Acceptor = null
val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)//
/* a meter to track the average free capacity of the network processors */
private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
/**
 * Start the socket server
 */
def startup() {
  val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
  for(i <- 0 until numProcessorThreads) {//启动num.network.threads个Processor线程处理网络请求
    processors(i) = new Processor(i, 
                                  time, 
                                  maxRequestSize, 
                                  aggregateIdleMeter,
                                  newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
                                  numProcessorThreads, 
                                  requestChannel,
                                  quotas,
                                  connectionsMaxIdleMs)
    Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
  }

  newGauge("ResponsesBeingSent", new Gauge[Int] {
    def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
  })
  // register the processor threads for notification of responses 注册response的listener,当有response的时候,调用ResponseListener
  requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
  // start accepting connections 接受网络连接请求
  this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
  Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
  acceptor.awaitStartup
  info("Started")
}
}

Acceptor作为一个独立的线程存在,当接受到网络连接请求的时候,轮训地甩给其中一个Processor线程处理之后的request

private[kafka] class Acceptor(val host: String, 
                              val port: Int, 
                              private val processors: Array[Processor],
                              val sendBufferSize: Int, 
                              val recvBufferSize: Int,
                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) {
  val serverChannel = openServerSocket(host, port)

  /**
   * Accept loop that checks for new connection attempts
   */
  def run() {
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    startupComplete()
    var currentProcessor = 0
    while(isRunning) {
      val ready = selector.select(500)
      if(ready > 0) {
        val keys = selector.selectedKeys()
        val iter = keys.iterator()
        while(iter.hasNext && isRunning) {
          var key: SelectionKey = null
          try {
            key = iter.next
            iter.remove()
            if(key.isAcceptable)
               accept(key, processors(currentProcessor))//添加到Processor的newConnections中,以后该processor负责这个Connections的所有request
            else
               throw new IllegalStateException("Unrecognized key state for acceptor thread.")
            // round robin to the next processor thread 轮训
            currentProcessor = (currentProcessor + 1) % processors.length
          } catch {
            case e: Throwable => error("Error while accepting connection", e)
          }
        }
      }
    }
    debug("Closing server socket and selector.")
    swallowError(serverChannel.close())
    swallowError(selector.close())
    shutdownComplete()
  }
}
那么Processor线程是如何处理request的呢?关键在于requestChannel,它作为request和response的传输通道,使得Processor线程只负责接受connection的requet和发送相应的reponse,而和真实的业务逻辑无关,且看requestChannel
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
  private var responseListeners: List[(Int) => Unit] = Nil
  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//1个request的阻塞队列,供之后的KafkaRequestHandler线程接收
  private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)// num.network.threads个response的阻塞队列,供之后的KafkaRequestHandler线程存放
  for(i <- 0 until numProcessors)
    responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
}
即Processor线程将各自对应的connection的request都存放进requestQueue中,然后分别从对应的responseQueues(i)中获取对应的request的response,如下图:

kafka源码解析之六SocketServer

代码如下:
private[kafka] class Processor(val id: Int,
                               val time: Time,
                               val maxRequestSize: Int,
                               val aggregateIdleMeter: Meter,
                               val idleMeter: Meter,
                               val totalProcessorThreads: Int,
                               val requestChannel: RequestChannel,
                               connectionQuotas: ConnectionQuotas,
                               val connectionsMaxIdleMs: Long) extends AbstractServerThread(connectionQuotas) {
override def run() {
  startupComplete()
  while(isRunning) {
    // setup any new connections that have been queued up
    configureNewConnections()
    // register any new responses for writing
    processNewResponses()//receive 对应阻塞队列responseQueue的response
    val startSelectTime = SystemTime.nanoseconds
    val ready = selector.select(300)
    currentTimeNanos = SystemTime.nanoseconds
    val idleTime = currentTimeNanos - startSelectTime
    idleMeter.mark(idleTime)
    // We use a single meter for aggregate idle percentage for the thread pool.
    // Since meter is calculated as total_recorded_value / time_window and
    // time_window is independent of the number of threads, each recorded idle
    // time should be discounted by # threads.
    aggregateIdleMeter.mark(idleTime / totalProcessorThreads)

    trace("Processor id " + id + " selection time = " + idleTime + " ns")
    if(ready > 0) {
      val keys = selector.selectedKeys()
      val iter = keys.iterator()
      while(iter.hasNext && isRunning) {
        var key: SelectionKey = null
        try {
          key = iter.next
          iter.remove()
          if(key.isReadable)
            read(key)//获取connection的request
          else if(key.isWritable)
            write(key)//写相应request的response
          else if(!key.isValid)
            close(key)
          else
            throw new IllegalStateException("Unrecognized key state for processor thread.")
        } catch {
          case e: EOFException => {
            info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
            close(key)
          } case e: InvalidRequestException => {
            info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
            close(key)
          } case e: Throwable => {
            error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
            close(key)
          }
        }
      }
    }
    maybeCloseOldestConnection
  }
  debug("Closing selector.")
  closeAll()
  swallowError(selector.close())
  shutdownComplete()
}

def read(key: SelectionKey) {
  lruConnections.put(key, currentTimeNanos)
  val socketChannel = channelFor(key)
  var receive = key.attachment.asInstanceOf[Receive]
  if(key.attachment == null) {
    receive = new BoundedByteBufferReceive(maxRequestSize)
    key.attach(receive)
  }
  val read = receive.readFrom(socketChannel)
  val address = socketChannel.socket.getRemoteSocketAddress();
  trace(read + " bytes read from " + address)
  if(read < 0) {
    close(key)
  } else if(receive.complete) {
    val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)//组装request
    requestChannel.sendRequest(req)//把request发送给requestChannel
    key.attach(null)
    // explicitly reset interest ops to not READ, no need to wake up the selector just yet
    key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
  } else {
    // more reading to be done
    trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
    key.interestOps(SelectionKey.OP_READ)
    wakeup()
  }
}

def write(key: SelectionKey) {
  val socketChannel = channelFor(key)
  val response = key.attachment().asInstanceOf[RequestChannel.Response]
  val responseSend = response.responseSend//获取response的内容
  if(responseSend == null)
    throw new IllegalStateException("Registered for write interest but no response attached to key.")
  val written = responseSend.writeTo(socketChannel)//将response发送给负责该connection的socket
  trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key)
  if(responseSend.complete) {
    response.request.updateRequestMetrics()
    key.attach(null)
    trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress())
    key.interestOps(SelectionKey.OP_READ)
  } else {
    trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress())
    key.interestOps(SelectionKey.OP_WRITE)
    wakeup()
  }
}
}