kafka源码分析之kafka启动-SocketServer

时间:2022-04-04 23:57:37

SocketServer

说明,socketserver主要用于处理kafka server对外提交网络请求的操作,用于检查连接数,把请求添加到请求的队列中,KafkaApis提供操作支持.

实例创建与启动

socketServer = new SocketServer(configmetricskafkaMetricsTime)
socketServer.startup()

 

实例初始化生成:

得到listeners配置的值,默认是PLAINTEXT://:port,前面部分是协议,

 可配置为PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL.

private val endpoints = config.listeners

得到num.network.threads配置的处理网络请求的线程个数配置,默认是3.
private val numProcessorThreads = config.numNetworkThreads

得到queued.max.requests配置的请求队列的最大个数,默认500.
private val maxQueuedRequests = config.queuedMaxRequests

这里生成总的处理线程的个数,用于网络请求,根据listener的个数与网络线程个数的乘积.默认只有一个listener.
private val totalProcessorThreads numProcessorThreads endpoints.size
得到max.connections.per.ip配置的单机IP的最大连接个数的配置,默认不限制.
private val maxConnectionsPerIp = config.maxConnectionsPerIp

得到max.connections.per.ip.overrides配置的针对某个特别的IP的连接个数限制的重新设置值.

多个IP配置间使用逗号分开,如:host1:500,host2:600
private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides

this.logIdent "[Socket Server on Broker " + config.brokerId "], "

生成用于网络请求的管道,管道的处理线程数就是配置的线程数*listeners的个数,请求队列的大小就是配置的队列大小.
val 
requestChannel new RequestChannel(totalProcessorThreadsmaxQueuedRequests)

 

实例启动:

def startup() {
  this.synchronized {
这里根据每个ip对应的连接数限制,生成一个用于对连接数进行记数的quotas实例.
    connectionQuotas new ConnectionQuotas(maxConnectionsPerIp

       maxConnectionsPerIpOverrides)


读取如下配置项,用于生成socket中SO_SNDBUF,SO_RCVBUF等的buffer的大小.

读取socket.send.buffer.bytes配置项,默认值100kb,这个用于SOCKET发送数据的缓冲区大小.

读取socket.receive.buffer.bytes配置项默认值100kb,这个用于SOCKET的接收数据的缓冲区大小.

读取socket.request.max.bytes配置的值,这个用于设置每次请求的数据大小.默认值,100MB.

读取connections.max.idle.ms配置的值,默认为10分钟,用于设置每个连接最大的空闲回收时间.
    val sendBufferSize = config.socketSendBufferBytes
    
val recvBufferSize = config.socketReceiveBufferBytes
    
val maxRequestSize = config.socketRequestMaxBytes
    
val connectionsMaxIdleMs = config.connectionsMaxIdleMs

得到配置项broker.id的值.
    
val brokerId = config.brokerId

这里根据每一个endpoint(也就是配置的listener的协议与端口),生成处理的网络线程Processor与Acceptor实例.并启动endpoint对应的Acceptor实例.在生成Acceptor的实例时,会同时启动此实例中对应的线程处理实例数组Processor.
    
var processorBeginIndex = 0
    endpoints.values.foreach { endpoint =>
      val protocol = endpoint.protocolType
      val processorEndIndex = processorBeginIndex + numProcessorThreads

      
for (i <- processorBeginIndex until processorEndIndex) {
        processors(i) = new Processor(i,
          time,
          maxRequestSize,
          requestChannel,
          connectionQuotas,
          connectionsMaxIdleMs,
          protocol,
          config.values,
          metrics
        )
      }

      val acceptor = new Acceptor(endpointsendBufferSizerecvBufferSizebrokerId,
        processors.slice(processorBeginIndexprocessorEndIndex)connectionQuotas)
      acceptors.put(endpointacceptor)
      Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString

          endpoint.port)acceptorfalse).start()
      acceptor.awaitStartup()

      processorBeginIndex = processorEndIndex
    }
  }

  newGauge("NetworkProcessorAvgIdlePercent",
    new Gauge[Double] {
      def value = allMetricNames.map( metricName =>
        metrics.metrics().get(metricName).value()).sum / totalProcessorThreads
    
}
  )

  info("Started " + acceptors.size + " acceptor threads")
}

 

接收网络请求

SocketServer实例生成并启动后,每一个endpoint会生成对应的Acceptor的实例,这个实例中根据网络请求配置的线程个数,生成对应个数的Processor实例.

首先先看看Acceptor实例中的run函数,这个用于对网络请求的接收:

def run() {
  serverChannel.register(nioSelectorSelectionKey.OP_ACCEPT)
  startupComplete()
  try {
    var currentProcessor = 0
    while (isRunning) {
      try {

这里进行堵塞接收,最多等500ms,如果ready返回的值是0表示还没有准备好,否则表示准备就绪.表示有通道已经被注册
        val ready = nioSelector.select(500)
        if (ready > 0) {

这里得到已经准备好的网络通道的key的集合.
          val keys = nioSelector.selectedKeys()
          val iter = keys.iterator()
          while (iter.hasNext && isRunning) {
            try {
              val key = iter.next
              iter.remove()

如果selectkey已经注册到accept事件,通过accept函数与对应的线程Processor进行处理.

这里表示这个socket的通道包含有一个client端的连接请求.
              if (key.isAcceptable)
                accept(keyprocessors(currentProcessor))
              else
                throw new 
IllegalStateException("Unrecognized key state for acceptor 

                     thread.")

 

每次接收一个socket请求后,用于处理的线程进行轮询到一个线程中处理.
              // round robin to the next processor thread
              
currentProcessor = (currentProcessor + 1) % processors.length
            } catch {
              case e: Throwable => error("Error while accepting connection"e)
            }
          }
        }
      }
      catch {
        
case e: ControlThrowable => throw e
        case e: Throwable => error("Error occurred"e)
      }
    }
  } finally {
    debug("Closing server socket and selector.")
    swallowError(serverChannel.close())
    swallowError(nioSelector.close())
    shutdownComplete()
  }
}

 

接下来看看accept函数的处理流程:

def accept(key: SelectionKeyprocessor: Processor) {
  val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]

得到请求的socket通道.
  val socketChannel = serverSocketChannel.accept()
  try {

这里检查当前的IP的连接数是否已经达到了最大的连接数,如果是,直接throw too many connect.
    connectionQuotas.inc(socketChannel.socket().getInetAddress)
    socketChannel.configureBlocking(false)
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setKeepAlive(true)
    socketChannel.socket().setSendBufferSize(sendBufferSize)

    .....................................


这里通过对应线程的Processor实例(Endpoint对应的Processor)来处理这个socket通道.
    processor.accept(socketChannel)
  } catch {
    case e: TooManyConnectionsException =>
      info("Rejected connection from %s, address already has the configured maximum 

           of %d connections.".format(e.ipe.count))
      close(socketChannel)
  }
}

 

然后再看看这个Processor实例中,如何去处理socket通道的请求流程:

/**
 * Queue up a new connection for reading
 */
def accept(socketChannel: SocketChannel) {

把socket通道添加到队列中.等待Processor线程的处理.
  newConnections.add(socketChannel)
  wakeup()
}

 

接下来,Processorrun函数不停的迭代时,会执行如下的处理流程:

override def run() {
  startupComplete()
  while(isRunning) {
    try {

这里取出一个Socket的通道,并注册到selector中,
      // setup any new connections that have been queued up
      
configureNewConnections()
      // register any new responses for writing
      
processNewResponses()

      try {

这里开始处理socket通道中的请求,根据如下几个流程进行处理:

1,如果请求中包含有一个isConnectable操作,把这个连接缓存起来.

2,如果请求中包含有isReadable操作.表示这个client的管道中包含有数据,需要读取,接收数据.

3,如果包含有isWriteable的操作,表示需要向client端进行写操作.

4,最后检查是否有connect被关闭的请求或connect连接空闲过期.
        selector.poll(300)
      } catch {
        case e @ (_: IllegalStateException | _: IOException) =>
          error("Closing processor %s due to illegal state or IO exception".format(id))
          swallow(closeAll())
          shutdownComplete()
          throw e
      }

 

得到对应的请求的Request的实例,并把这个Request通过SocketServer中的RequestChannel的sendRequest的函数,把请求添加到请求的队列中.等待KafkaApis来进行处理.
      selector.completedReceives.asScala.foreach { receive =>
        try {
          val channel = selector.channel(receive.source)
          val session = RequestChannel.Session(

              new KafkaPrincipal(KafkaPrincipal.USER_TYPE,

                   channel.principal.getName),
              channel.socketAddress)
          val req = RequestChannel.Request(processor = id

               connectionId = receive.sourcesession = session

               buffer = receive.payloadstartTimeMs = time.milliseconds,

               securityProtocol = protocol)

 

这里请参见KafkaApis中对requestChannel的队列请求进行处理的具体流程.
          requestChannel.sendRequest(req)
        } catch {
          case e @ (_: InvalidRequestException | _: SchemaException) =>
            
error("Closing socket for " + receive.source + " because of error"e)
            close(selectorreceive.source)
        }
        selector.mute(receive.source)
      }

这里的send完成表示有向client端进行响应的写操作处理完成.
      selector.completedSends.asScala.foreach { send =>
        val resp = inflightResponses.remove(send.destination).getOrElse {
          throw new IllegalStateException(s"Send for ${send.destination} completed, 

               but not in `inflightResponses`")
        }
        resp.request.updateRequestMetrics()
        selector.unmute(send.destination)
      }

如果socket server中包含有已经关闭的连接,减少这个quotas中对此ip的连接数的值.

这个情况包含connect处理超时或者说有connect的消息处理错误被发起了close的请求后的处理成功的消息.
      selector.disconnected.asScala.foreach { connectionId =>
        val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
          throw new IllegalStateException(s"connectionId has unexpected format: 

              $connectionId")
        }.remoteHost
        
connectionQuotas.dec(InetAddress.getByName(remoteHost))
      }

    } catch {
      
case e : ControlThrowable => throw e
      case e : Throwable =>
        error("Processor got uncaught exception."e)
    }
  }

  debug("Closing selector - processor " + id)
  swallowError(closeAll())
  shutdownComplete()
}