关于Kafka java consumer管理TCP连接的讨论

时间:2024-03-24 09:04:08

  本篇是《关于Kafka producer管理TCP连接的讨论》的续篇,主要讨论Kafka java consumer是如何管理TCP连接。实际上,这两篇大部分的内容是相同的,即consumer也是把TCP连接的管理交由底层的Selector类(org.apache.kafka.common.network)来维护。我们依然以“何时创建/创建多少/何时关闭/潜在问题/总结”的顺序来讨论。和上一篇一样,本文将无差别地混用名词TCP和Socket。

一、何时创建TCP连接

  首先明确的是,在构建KafkaConsumer实例时是不会创建任何TCP连接的;另外在调用诸如subscribe或assign的时候也不会创建任何TCP连接。那么TCP连接是在什么时候创建的呢?严格来说有几个可能的时间点。从粗粒度层面来说,我们可以安全地认为Socket连接是在调用consumer.poll()创建的;从细粒度层面来说,TCP连接创建的时机有3个:1. 请求METADATA时;2. 进行组协调时;3. 发送数据时。

二、创建多少个TCP连接

  对于每台broker而言,kafka consumer实例通常会创建3个TCP连接,第一个TCP连接是consumer请求集群元数据时创建的,之后consumer会使用这个Socket继续请求元数据以及寻找group对应的coordinator,如下列日志所示:

[2019-01-01 17:38:22,301] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
...
[2019-01-01 17:38:22,360] TRACE [Consumer clientId=consumer-1, groupId=test] Sending METADATA {topics=[bar,foo],allow_auto_topic_creation=true} with correlation id 2 to node -1 (org.apache.kafka.clients.NetworkClient:492)
...
[2019-01-01 17:38:22,360] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR {coordinator_key=test,coordinator_type=0} with correlation id 0 to node -1 (org.apache.kafka.clients.NetworkClient:492)

至于这里为什么是node -1是因为首次请求元数据时尚不确定broker.id,所以只能先用-1替代。

  第二个TCP连接供consumer执行组协调操作使用,这里的组协调操作包括:JOIN_GROUP(加入组)、SYNC_GROUP(等待组分配方案)、HEARTBEAT(心跳请求)、OFFSET_FETCH(获取位移)、OFFSET_COMMIT(提交位移)以及其他请求(比如LEAVE_GROUP,但本例中没有演示组成员退出的情形,故日志中没有出现这个请求类型),如下列日志所示:

[2019-01-01 17:38:22,379] TRACE [Consumer clientId=consumer-1, groupId=test] Sending JOIN_GROUP {group_id=test,session_timeout=10000,rebalance_timeout=300000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=20 cap=20]}]} with correlation id 3 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:22,382] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for JOIN_GROUP with correlation id 3, received {throttle_time_ms=0,error_code=0,generation_id=9,group_protocol=range,leader_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,members=[{member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=20 cap=20]}]} (org.apache.kafka.clients.NetworkClient:810)
[2019-01-01 17:38:22,386] TRACE [Consumer clientId=consumer-1, groupId=test] Sending SYNC_GROUP {group_id=test,generation_id=9,member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,group_assignment=[{member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=36 cap=36]}]} with correlation id 5 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:22,388] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for SYNC_GROUP with correlation id 5, received {throttle_time_ms=0,error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=36 cap=36]} (org.apache.kafka.clients.NetworkClient:810)
[2019-01-01 17:38:22,396] TRACE [Consumer clientId=consumer-1, groupId=test] Sending OFFSET_FETCH {group_id=test,topics=[{topic=bar,partitions=[{partition=0}]},{topic=foo,partitions=[{partition=0}]}]} with correlation id 6 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-03 17:38:22,397] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for OFFSET_FETCH with correlation id 6, received {throttle_time_ms=0,responses=[{topic=bar,partition_responses=[{partition=0,offset=0,leader_epoch=-1,metadata=,error_code=0}]},{topic=foo,partition_responses=[{partition=0,offset=0,leader_epoch=-1,metadata=,error_code=0}]}],error_code=0} (org.apache.kafka.clients.NetworkClient:810)
...
[2019-01-01 17:38:23,401] TRACE [Consumer clientId=consumer-1, groupId=test] Sending OFFSET_COMMIT {group_id=test,generation_id=9,member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,topics=[{topic=bar,partitions=[{partition=0,offset=0,leader_epoch=-1,metadata=}]},{topic=foo,partitions=[{partition=0,offset=0,leader_epoch=-1,metadata=}]}]} with correlation id 10 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:23,403] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for OFFSET_COMMIT with correlation id 10, received {throttle_time_ms=0,responses=[{topic=bar,partition_responses=[{partition=0,error_code=0}]},{topic=foo,partition_responses=[{partition=0,error_code=0}]}]} (org.apache.kafka.clients.NetworkClient:810)

  上面标红的节点ID看上去有些奇怪,实际上它是由Integer.MAX_VALUE - coordinator的broker.id计算得来的,因为我的测试环境中只有一台broker且该id是0,所以这个Socket连接的节点ID就是Integer.MAX_VALUE,即2147483647。针对这个node ID的计算方式,Kafka代码是故意为之的,目的就是要让组协调请求和真正的数据获取请求使用不同的Socket连接。
  第三个Socket连接就非常好理解了,就是用于发送FETCH请求的。当consumer代码使用第一个Socket连接获取到集群元数据之后,每个broker的真实ID已经缓存在consumer本地的内存中,因此此时代码会使用真实的ID创建第三个Socket连接并用于消息获取,如下列日志所示:

[2019-01-01 17:38:23,424] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=104064890,session_epoch=2,topics=[],forgotten_topics_data=[]} with correlation id 11 to node 0 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:23,927] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 0 for FETCH with correlation id 11, received {throttle_time_ms=0,error_code=0,session_id=104064890,responses=[]} (org.apache.kafka.clients.NetworkClient:810)
[2019-01-01 17:38:23,928] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=104064890,session_epoch=3,topics=[],forgotten_topics_data=[]} with correlation id 12 to node 0 (org.apache.kafka.clients.NetworkClient:492)
...

上面标红的节点0是真实的broker.id,可见consumer是使用这个Socket进行消息获取操作的。值得一提的是,当这个Socket连接成功建立之后,第一个Socket连接就会被废弃掉,之后所有的元数据请求都通过第三个Socket发送。

三、何时关闭TCP连接

  和Producer原理相同,consumer关闭Socket也分为主动关闭和Kafka自动关闭。主动关闭依然是由用户发起,显式调用consumer.close()以及类似方法亦或是kill -9;而Kafka自动关闭同样由connections.max.idle.ms参数值控制。和producer有些不同的是,如果用户写consumer程序时使用了循环的方式来poll消息,那么上面提到的所有请求都会不断地发送到broker,故这些Socket连接上总是能保证有请求在发送,因此实现了“长连接”的效果。

四、可能的问题?

  Consumer端和producer端的问题是一样的,即第一个Socket连接仅仅是为了首次(最多也就是几次)获取元数据之用,后面就会被废弃掉。根本的原因在于它使用了“假”的broker id去注册,当 后面consumer获取了真实的broker id之后它无法区分哪个broker id对应这个假ID,所以只能重新创建另外的Socket连接。

五、总结

  最后总结一下当前的结论,针对最新版本Kafka(2.1.0)而言,Java consumer端管理TCP连接的方式是:

1.  KafkaConsumer实例创建时不会创建任何Socket连接,实例创建之后首次请求元数据时会创建第一个Socket连接

2. KafkaConsumer实例拿到元数据信息之后随机寻找其中一个broker去发现对应的coordinator,然后向coordinator所在broker创建第二个Socket连接。之后所有的组协调请求处理都经由该Socket

3. 步骤1中创建的TCP连接只用于首次获取元数据信息,后面会被废弃掉

4. 如果设置consumer端connections.max.idle.ms参数大于0,则步骤1中创建的TCP连接会被自动关闭;如果设置该参数=-1,那么步骤1中创建的TCP连接将成为“僵尸”连接

5. 当前consumer判断是否存在与某broker的TCP连接依靠的是broker id,这是有问题的,依靠<host, port>对可能是更好的方式