apache kafka系列之源码分析走读-SocketServer分析

时间:2021-04-09 16:46:40

SocketServer网络架构流程请阅读-server端网络架构分析

本文对Kafka最新版0.8.2.1中SocketServer源码进行分析,比起0.8.x版本,该版本增加了新特性,客户端请求ip连接数限制,Processor空闲时间统计,空闲连接资源回收等,设置此这些参数有效保证进程资源不会被连接数耗尽。

SocketServer设计思路

Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor线程负责读写数据,M个Handler来处理业务逻辑。在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求。


启动初始化工作,创建一个ConnectionQuotas对象负责管理连接数/IP,创建一个Acceptor线程负责监听客户端请求,并建立和客户端的数据链路建立连接,然后转交给Processor线程进行处理,创建N个Processor负责客户端数据读写。其中Acceptor线程和N个Processor线程中每个线程都独立创建Selector.open()多路复用器,具体请参考并发编程网:Java NIO系列教程(六) Selector

  def startup() {
val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
for(i <- 0 until numProcessorThreads) {
processors(i) = new Processor(i,
time,
maxRequestSize,
aggregateIdleMeter,
newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
numProcessorThreads,
requestChannel,
quotas,
connectionsMaxIdleMs)
}

newGauge("ResponsesBeingSent", new Gauge[Int] {
def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
})

// register the processor threads for notification of responses
requestChannel.addResponseListener((id:Int) => processors(id).wakeup())

// start accepting connections
this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
acceptor.awaitStartup

}

kafka.net.Acceptor

Acceptor主要逻辑分析

1.创建监听对象ServerSocketChannel,启动监听服务。

val serverChannel = openServerSocket(host, port)


Acceptor类中run方法运行逻辑如下:

2.首先在ServerSocketChannel上注册OP_ACCEPT事件

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

3.等待客户端请求,如没有请求select每间隔500ms返回一次。

4.如果有请求进来,则将其分配给当前的processor,并且把当前processor指向下一个processor,也就是说它采用了Round Robin的方式来选择processor。

   while(isRunning) {
val ready = selector.select(500)
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectionKey = null
try {
key = iter.next
iter.remove()
if(key.isAcceptable)
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")

// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}


5.基础知识介绍,下面的SelectionKey是表示一个Channel和Selector的注册关系。在Acceptor中的selector,只有监听客户端连接请求的ServerSocketChannel的OP_ACCEPT事件注册在上面。当selector的select方法返回时,则表示注册在它上面的Channel发生了对应的事件。在Acceptor中,这个事件就是OP_ACCEPT,表示这个ServerSocketChannel的OP_ACCEPT事件发生了。


6.Acceptor的accept方法的处理逻辑为:首先通过SelectionKey来拿到对应的ServerSocketChannel,并调用其accept方法来建立和客户端的连接,然后拿到对应的SocketChannel并交给了Processor。然后Acceptor的任务就完成了,开始去处理下一个客户端的连接请求。

 def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setSendBufferSize(sendBufferSize)

processor.accept(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
close(socketChannel)
}
}


kafka.net.Processor

Processor线程主要职责是负责从客户端读取数据和将响应返回给客户端,其自身不处理具体的业务逻辑。每个Processor都有一个Selector多路复用器,用来监听多个客户端,因此可以非阻塞地处理多个客户端的读写请求


处理新建连接

从上一节中可以看到,Acceptor会把多个客户端的数据连接SocketChannel分配一个Processor,因此每个Processor内部都有一个队列来保存这些新来的数据连接:

private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()

Processor的accpet方法逻辑如下

把socketChannel放入新建连接链路队列中,然后唤醒Processor的Selector。

  def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}

Processor类中run方法运行逻辑如下

首先调用configureNewConnections,如果队列中又新连接(SocketChannel),则将该SocketChannel的OP_READ事件注册到Processor对应的Selector上

  private def configureNewConnections() {
while(newConnections.size() > 0) {
val channel = newConnections.poll()
debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
channel.register(selector, SelectionKey.OP_READ)
}
}


下面截取部分代码是Processor线程做事件循环读写数据以及回收连接资源逻辑,后续对各个部分进行详细分析

      val ready = selector.select(300)
currentTimeNanos = SystemTime.nanoseconds
val idleTime = currentTimeNanos - startSelectTime
idleMeter.mark(idleTime)

aggregateIdleMeter.mark(idleTime / totalProcessorThreads)

trace("Processor id " + id + " selection time = " + idleTime + " ns")
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectionKey = null
try {
key = iter.next
iter.remove()
if(key.isReadable)
read(key)
else if(key.isWritable)
write(key)
else if(!key.isValid)
close(key)
else
throw new IllegalStateException("Unrecognized key state for processor thread.")
} catch {
case e: EOFException => {
info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
close(key)
} case e: InvalidRequestException => {
info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
close(key)
} case e: Throwable => {
error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
close(key)
}
}
}
}
maybeCloseOldestConnection
}

读取客户端数据逻辑分析:

a.把当前SelectionKey和事件循环时间放入LRU映射表中,将来检查时回收连接资源。

b.从SelectionKey中拿到对应的SocketChannel,并且取出attach在SelectionKey上的Receive对象,如果是第一次读取,Receive对象为null,则创建一个BoundedByteBufferReceive,由它来处理具体的读数据的逻辑。可以看到每个客户端都有一个Receive对象来读取数据.

c.如果数据从客户端读取完毕(receive.complete),则将读取的数据封装成Request对象,并添加到requestChannel中去。如果没有读取完毕(可能是客户端还没有发送完或者网络延迟),那么就让selector继续监听这个通道的OP_READ事件。

因此,我们知道具体读取数据是在BoundedByteBufferReceive里面完成的,而读取完成后要交给RequestChannel,接下来我们来看这两部分的代码。


  def read(key: SelectionKey) {
lruConnections.put(key, currentTimeNanos)
val socketChannel = channelFor(key)
var receive = key.attachment.asInstanceOf[Receive]
if(key.attachment == null) {
receive = new BoundedByteBufferReceive(maxRequestSize)
key.attach(receive)
}
val read = receive.readFrom(socketChannel)
val address = socketChannel.socket.getRemoteSocketAddress();
trace(read + " bytes read from " + address)
if(read < 0) {
close(key)
} else if(receive.complete) {
val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
requestChannel.sendRequest(req)
key.attach(null)
key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
} else {
// more reading to be done
trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_READ)
wakeup()
}
}

BoundedByteBufferReceive

BoundedByteBufferReceive中有2个ByteBuffer,分别是sizeBuffer和contentBuffer,其中sizeBuffer是固定的4个字节,表示这次发送来的数据总共有多大,随后再读取对应大小的数据放到contentBuffer中。

核心处理逻辑都是在readFrom这个方法中,代码如下:

  def readFrom(channel: ReadableByteChannel): Int = {
expectIncomplete()
var read = 0
if(sizeBuffer.remaining > 0)
read += Utils.read(channel, sizeBuffer)
if(contentBuffer == null && !sizeBuffer.hasRemaining) {
sizeBuffer.rewind()
val size = sizeBuffer.getInt()
if(size <= 0)
throw new InvalidRequestException("%d is not a valid request size.".format(size))
if(size > maxSize)
throw new InvalidRequestException("Request of length %d is not valid, it is larger than the maximum size of %d bytes.".format(size, maxSize))
contentBuffer = byteBufferAllocate(size)
}
if(contentBuffer != null) {
read = Utils.read(channel, contentBuffer)
if(!contentBuffer.hasRemaining) {
contentBuffer.rewind()
complete = true
}
}
read
}

首先检查sizeBuffer是不是都读满了,没有的话就从对应的channel中读取数据放到sizeBuffer中,就是下面这句,它会从channel中读取最多等同于sizeBuffer中剩下空间数量的数据。

Utils.read(channel, sizeBuffer)

当sizeBuffer读取完成了,就知道真正的数据有多少了,因此就是按照这个大小来分配contentBuffer了。紧接着就是从channel读取真正的数据放到contentBuffer中,当把contentBuffer读满以后就停止了并把complet标记为true。因此,可以看到客户端在发送数据的时候需要先发送这次要发送数据的大小,然后再发送对应的数据。

这样设计是因为java NIO在从channel中读取数据的时候只能指定读多少,而且数据也不是一次就能全部读取完成的,用这种方式来保证数据都读进来了。

到此为止,我们知道了Processor是如何读取数据的。简而言之,Processor通过selector来监听它负责的那些数据通道,当通道上有数据可读时,它就是把这个事情交给BoundedByteBufferReceive。BoundedByteBufferReceive先读一个int来确定数据量有多少,然后再读取真正的数据。那数据读取进来后又是如何被处理的呢?下一节来分析对应的代码。

kafka.network.RequestChannel

RequestChannel是Processor和Handler交换数据的地方。它包含了一个队列requestQueue用来存放Processor加入的Request,Handler会从里面取出Request来处理;它还为每个Processor开辟了一个respondQueue,用来存放Handler处理了Request后给客户端的Response。下面是一些源码:

初始化requestQueue和responseQueues的代码:

  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()


sendRequest方法:Processor在读取完数据后,将数据封装成一个Request对象然后调用这个方法将Request添加到requestQueue中。如果requestQueue满的话,这个方法会阻塞在这里直到有Handler取走一个Request。

  def sendRequest(request: RequestChannel.Request) {
requestQueue.put(request)
}

receiveRequest方法:Handler从requestQueue中取出Request,如果队列为空,这个方法会阻塞在这里直到有Processor加入新的Request。

  def receiveRequest(): RequestChannel.Request =
requestQueue.take()

类似的sendResponse和receiveResponse就写在这里,唯一的区别就是添加和取出Response的时候要指定Processor的id因为每个Processor都有其对应的responseQueue。

返回数据给客户端

Processor不仅负责从客户端读取数据,还要将Handler的处理结果返回给客户端。在Processor的run方法(Processor是一个线程类),它会调用processNewResponses()来处理Handler的提供给客户端的Response。简化的代码如下:

  private def processNewResponses() {
var curr = requestChannel.receiveResponse(id)
while(curr != null) {
val key = curr.request.requestKey.asInstanceOf[SelectionKey]
curr.responseAction match {
case RequestChannel.SendAction => {
key.interestOps(SelectionKey.OP_WRITE)
key.attach(curr)
}
}
curr = requestChannel.receiveResponse(id)
}
}

它依次把requestChannel中responseQueue的Response取出来,然后将对应通道的OP_WRITE事件注册到selector上。这和上面的configureNewConnections很类似。

然后当selector的select方法返回时,检查是否有通道是WRITEABLE,如果有则调用Processor中的write方法。在write方法中,Processor又将具体写数据的任务交给了Response中的Send对象。这和读取数据的处理方式非常类似,就不细说了。

到此为止,我们分析了Processor是如何从客户端读取数据的,以及如何将Handler处理后的响应返回给客户端。下一节将简要分析一下Handler。

kafka.server.KafkaRequestHandler

Handler的职责是从requestChannel中的requestQueue取出Request,处理以后再将Response添加到requestChannel中的responseQueue中。 因为Handler是处理具体业务的,所以它可以有不同的实现,或者把具体的处理再外包出去。我们就简要看一下KafkaRequestHandler是如何做的。

KafkaRequestHandler实现了Runnable,因此是个线程类,除去错误处理的代码后,其run方法可以简化为如下代码,它把所有的处理逻辑都交给了KafkaApis:

  def run() {
while(true) {
var req : RequestChannel.Request = requestChannel.receiveRequest(300)
apis.handle(req)
}
}

因为KafkaApis是和具体业务相关,详细分析请看KafkaApis详解

ip连接数限制逻辑

核心逻辑代码如下

连接计数器被多个不同线程(Acceptor,Processor)调用,为了保证操作时线程安全,调用计数方法必须加锁。计数器是按照IP分类计算的。

a.当有新请求进入时,且成功建立连接链路时,Acceptor中accept调用inc方法计数器加一,如果大于设置IP连接阀值抛出TooManyConnectionsException异常。

b.当服务端关闭连接时,Processor中close调用dec方法计数器减一

  def inc(addr: InetAddress) {
counts synchronized {
val count = counts.getOrElse(addr, 0)
counts.put(addr, count + 1)
val max = overrides.getOrElse(addr, defaultMax)
if(count >= max)
throw new TooManyConnectionsException(addr, max)
}
}

def dec(addr: InetAddress) {
counts synchronized {
val count = counts.get(addr).get
if(count == 1)
counts.remove(addr)
else
counts.put(addr, count - 1)
}
}

连接资源回收处理

a.每轮事件循环后调用maybeCloseOldestConnection方法,判断是否有长期空闲连接资源需要回收。判断逻辑在maybeCloseOldestConnection中处理,下节做分析。

  override def run() {
startupComplete()
while(isRunning) {
val startSelectTime = SystemTime.nanoseconds
val ready = selector.select(300)
currentTimeNanos = SystemTime.nanoseconds
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {

}
}
maybeCloseOldestConnection
}
}

空闲连接资源回收逻辑

a.如果当轮事件循环发生时间大于最久连接访问时间 + 连接最大空闲时间,则程序执行进入连接资源回收逻辑。

b.如果当前LRU映射表为空,则nextIdleCloseCheckTime指向下一个时间点。

c.如果LRU映射表不为为空,从LRU取出最久未访问连接时间点加上连接最大空闲时间与当前时间点做比较,小于则关闭此连接,否则不做任何处理.

  private def maybeCloseOldestConnection {
if(currentTimeNanos > nextIdleCloseCheckTime) {
if(lruConnections.isEmpty) {
nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos
} else {
val oldestConnectionEntry = lruConnections.entrySet.iterator().next()
val connectionLastActiveTime = oldestConnectionEntry.getValue
nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos
if(currentTimeNanos > nextIdleCloseCheckTime) {
val key: SelectionKey = oldestConnectionEntry.getKey
close(key)
}
}
}
}














参考引用:

http://blog.csdn.net/jewes/article/details/42403721