Apache Kafka源码分析-客户端请求响应模型

时间:2022-04-17 07:25:00

        从源码角度分析一下Kaka响应客户端的流程总体上看是一个 1-M-N的请求响应模型,如图1-1所示,下做简述:

  1. Kafka使用ServerSocketChannel + Java NIO 非阻塞的方式来响应客户端请求
  2. Kafka由一个Acceptor线程负责将来自于不同客户端的请求分发给多个Processor线程进行处理,期间有简单的负载均衡策略,保证单个Processor处理的连接不至于过载。
  3. 每个Processor线程组处理被Accepter分发的连接进行处理,并不涉及具体的业务,只是简单的负责将新接入的连接加入到待处理的队列,同时Processor线程组每次会从RequestChannel 响应队列获取属于自己的响应消息,发送给客户端,RequestChannel即为中间的缓冲队列。
  4. KafkaRequestHandler线程组用来消费RequestChannel 中的请求信息,处理业务逻辑,将响应信息同样写回RequestChannel[不同的集合来存储]

Apache Kafka源码分析-客户端请求响应模型

图1-1 Kafka 客户端请求响应模型

    源码分析,整个流程的开始于KafkaServer的startup()方法,这里只关注SocketServer、KafkaApis、KafkaRequestHandler模块的启动:

1.SocketServer 启动

  • SocketServer 中定义了若干关于线程数、队列大小、配额参数,有Processor线程数、最大请求队列数等的定义

  private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap
private val numProcessorThreads = config.numNetworkThreads
private val maxQueuedRequests = config.queuedMaxRequests
private val totalProcessorThreads = numProcessorThreads * endpoints.size

private val maxConnectionsPerIp = config.maxConnectionsPerIp
private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides

  • RequestChannel 初始化,requestChannel 用来在Processor和KafkaRequestHandler之间缓存待处理的Request和待返回的Response,是Processor和KafkaRequestHandler数据交互的核心数据结构
  • Processor 实例化,Acceptor实例化

private val processors = new Array[Processor](totalProcessorThreads)
......
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)

  • Processor线程组的启动,该过程发生在Acceptor实例化过程中代码在Acceptor.scala中,于此同时发生的是Socke的创建及NIO 异步IO的建立

//创建Selector  
private val nioSelector = NSelector.open()
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
//启动Processor线程
this.synchronized {
processors.foreach { processor =>
Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
processor, false).start()
}
}

  • Acceptor 开始工作,主要完成ServerSocketChannel 到NIO Select的注册,正是开始处理来自于ServerSocketChannel的连接请求,核心方法accept将客户端的连接请求对应为SocketChannel并调用Processor来处理

def run() {
//将ServerSocketChannel 注册到Selector
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable)
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")

// round robin to the next processor thread
//Processor的负载均衡发生在这里
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(nioSelector.close())
shutdownComplete()
}
}

def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)

debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
.format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
socketChannel.socket.getSendBufferSize, sendBufferSize,
socketChannel.socket.getReceiveBufferSize, recvBufferSize))
//调用Processor来处理请求
processor.accept(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
close(socketChannel)
}
}

  • 下面看看Processor线程具体是如何来处理socketChannel,关注的是Processor 的run和Acceptor中直接调用的accept 方法,显然Acceptor调用Processor的accept方法仅仅是简单的将socketChannel 加入Processor的成员newConnections中缓存起来,Processor线程是一个while(true)的处理模型,完成新连接的确认、处理已经完成业务的响应[写给客户端]等操作,下一步介绍。

  def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}

  override def run() {
startupComplete()
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()
poll()
processCompletedReceives()
processCompletedSends()
processDisconnected()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
// or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
case e: ControlThrowable => throw e
case e: Throwable =>
error("Processor got uncaught exception.", e)
}
}

debug("Closing selector - processor " + id)
swallowError(closeAll())
shutdownComplete()
}

  • 如何确认一个新的连接,如何处理一个新的响应,处理一个已经完成的接收。Processor对一个SocketChannel的确认仅是将它注册到Selector[NIO中管理所有注册的channel]中,Processor 对响应的处理实际上是从RequestChannel 的responseQueues[Array[BlockingQueue[RequestChannel.Response]]]中获取已经被处理完的Response,Processor对一个已经完整接收的请求(存储于selector.completedReceives中)处理最终调用RequestChannel的sendRequest完成本质上是将完成接收的请求对象缓存到RequestChannel 的requestQueue 队列。

  private def configureNewConnections() {
while (!newConnections.isEmpty) {
val channel = newConnections.poll()
try {
debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
val localHost = channel.socket().getLocalAddress.getHostAddress
val localPort = channel.socket().getLocalPort
val remoteHost = channel.socket().getInetAddress.getHostAddress
val remotePort = channel.socket().getPort
val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
selector.register(connectionId, channel)
} catch {
// We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
// throwables will be caught in processor and logged as uncaught exceptions.
case NonFatal(e) =>
val remoteAddress = channel.getRemoteAddress
// need to close the channel here to avoid a socket leak.
close(channel)
error(s"Processor $id closed connection from $remoteAddress", e)
}
}
}

  private def processNewResponses() {
var curr = requestChannel.receiveResponse(id)
while (curr != null) {
try {
curr.responseAction match {
case RequestChannel.NoOpAction =>
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
curr.request.updateRequestMetrics
trace("Socket server received empty response to send, registering for read: " + curr)
val channelId = curr.request.connectionId
if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
selector.unmute(channelId)
case RequestChannel.SendAction =>
sendResponse(curr)
case RequestChannel.CloseConnectionAction =>
curr.request.updateRequestMetrics
trace("Closing socket connection actively according to the response code.")
close(selector, curr.request.connectionId)
}
} finally {
curr = requestChannel.receiveResponse(id)
}
}
}

  private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
val openChannel = selector.channel(receive.source)
val session = {
// Only methods that are safe to call on a disconnected channel should be invoked on 'channel'.
val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
}
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
buffer = receive.payload, startTimeMs = time.milliseconds, listenerName = listenerName,
securityProtocol = securityProtocol)
requestChannel.sendRequest(req)
selector.mute(receive.source)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error(s"Closing socket for ${receive.source} because of error", e)
close(selector, receive.source)
}
}
}

  • 至此Acceptor和Processor 的逻辑相对比较清晰了,总的来看是Acceptor 负责将NIO中的请求以负载的方式交给多个Processor 并发来响应客户但请求,期间SocketChannel 以NIO异步的方式保持连接。一个被完整接收的请求被发送到RequestChannel 指定队列,RequestChannel指定队列中存储的处理结果被特定的Processor获取并返回给特定的客户端,下面看看具体的请求是如何被处理的。

2.KafkaApis实例化

  • KafkaApis是一个普通的scala类,主构造器中传入了RequestChannel的引用,而KafkaApis则为真正意义上处理客户端请求的开始,核心为handle方法,大概我们能看到handler方法中能响应的客户端请求类型,以这里暂不具体说明每一种请求的处理方式。

apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
clusterId, time)

  def handle(request: RequestChannel.Request) {
try {
trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
ApiKeys.forId(request.requestId) match {
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
//创建topic的请求
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
if (request.requestObj != null) {
request.requestObj.handleError(e, requestChannel, request)
error("Error when handling request %s".format(request.requestObj), e)
} else {
val response = request.body.getErrorResponse(e)

/* If request doesn't have a default error response, we just close the connection.
For example, when produce request has acks set to 0 */
if (response == null)
requestChannel.closeConnection(request.processor, request)
else
requestChannel.sendResponse(new Response(request, response))

error("Error when handling request %s".format(request.body), e)
}
} finally
request.apiLocalCompleteTimeMs = time.milliseconds
}

  • 以创建一个topic的请求为例分析一下KafkaApi是如何生成一个Response对象并放入RequestChannel 中responseQueues中去,topic的创建最终由adminManager模块负责

  def handleCreateTopicsRequest(request: RequestChannel.Request) {
val createTopicsRequest = request.body.asInstanceOf[CreateTopicsRequest]

def sendResponseCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = {
val responseBody = new CreateTopicsResponse(results.asJava, request.header.apiVersion)
trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
//将处理结果发送到RequestChannel的requestChannel队列
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}

//controller 模块状态检测
if (!controller.isActive) {
val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
(topic, new CreateTopicsResponse.Error(Errors.NOT_CONTROLLER, null))
}
sendResponseCallback(results)
} else if (!authorize(request.session, Create, Resource.ClusterResource)) {

val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
(topic, new CreateTopicsResponse.Error(Errors.CLUSTER_AUTHORIZATION_FAILED, null))
}
sendResponseCallback(results)
} else {
//正常情况下走这里
val (validTopics, duplicateTopics) = createTopicsRequest.topics.asScala.partition { case (topic, _) =>
!createTopicsRequest.duplicateTopics.contains(topic)
}

// Special handling to add duplicate topics to the response
def sendResponseWithDuplicatesCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = {

val duplicatedTopicsResults =
if (duplicateTopics.nonEmpty) {
val errorMessage = s"Create topics request from client `${request.header.clientId}` contains multiple entries " +
s"for the following topics: ${duplicateTopics.keySet.mkString(",")}"
// We can send the error message in the response for version 1, so we don't have to log it any more
if (request.header.apiVersion == 0)
warn(errorMessage)
duplicateTopics.keySet.map((_, new CreateTopicsResponse.Error(Errors.INVALID_REQUEST, errorMessage))).toMap
} else Map.empty

val completeResults = results ++ duplicatedTopicsResults
//执行回调操作
sendResponseCallback(completeResults)
}

adminManager.createTopics(
createTopicsRequest.timeout,
createTopicsRequest.validateOnly,
validTopics,
sendResponseWithDuplicatesCallback
)
}
}

  • 客户端请求的处理已经明确,还有一个问题,KafkaApi是由谁来驱动的,继续来看,由于Processor有多个线程对应的并发模型中处理驱动KafkaApi的原则上应该也是一个线程组,确时这一步骤由KafkaRequestHandler线程组来完成,见下。

3.KafkaRequestHandler

  • KafkaRequestHandler并发的处理RequestChannel中requestQueue队列中请求,并调用KafkaApi处理请求,KafkaRequestHandler线程组的创建由KafkaRequestHandlerPool对象完成

 requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)

class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int) extends Logging with KafkaMetricsGroup {

/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}

def shutdown() {
info("shutting down")
for(handler <- runnables)
handler.shutdown
for(thread <- threads)
thread.join
info("shut down completely")
}
}

  • KafkaRequestHandler 处理业务的流程,这里关注它的run方法,可见KafkaRequestHandler对请求的处理通过调用KafkaAPI的 handle方法实现。

  def run() {
while(true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
req = requestChannel.receiveRequest(300)
val idleTime = time.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}

if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = time.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
//调用KafkaApi处理请求
apis.handle(req)
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
}