apache kafka源码分析走读-ZookeeperConsumerConnector分析

时间:2021-01-27 16:48:31

1.ZookeeperConsumer架构

ZookeeperConsumer类中consumer运行过程架构图:

apache kafka源码分析走读-ZookeeperConsumerConnector分析

                                                                                                图1

过程分析:

ConsumerGroupExample类

2.消费者线程(consumer thread),队列,拉取线程(fetch thread)三者之间关系

每一个topic至少需要创建一个consumer thread,如果有多个partitions,则可以创建多个consumer thread线程,consumer thread>==partitions数量,否则会有consumer thread空闲。

部分代码示例如下:

ConsumerConnector consumer

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

            createConsumerConfig());

 Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

 topicCountMap.put("test-string-topic", new Integer(1));  //value表示consumer thread线程数量

 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

具体说明一下三者关系:

(1).topic的partitions分布规则

paritions是安装kafka brokerId有序分配的。

例如现在有三个node安装了kafka broker服务端程序,brokerId分别设置为1,2,3,现在准备一个topic为test-string-topic,并且分配12个partitons,此时partitions的kafka broker节点分布情况为 ,partitions索引编号为0,3,6,9等4个partitions在brokerId=1上,1,4,7,10在brokerId=2上,2,5,8,11在brokerId=3上。

创建consumer thread

consumer thread数量与BlockingQueue一一对应。

a.当consumer thread count=1时

此时有一个blockingQueue1,三个fetch thread线程,该topic分布在几个node上就有几个fetch thread,每个fetch thread会于kafka broker建立一个连接。3个fetch thread线程去拉取消息数据,最终放到blockingQueue1中,等待consumer thread来消费。

消费者线程,缓冲队列,partitions分布列表如下

consumer线程

Blocking Queue

partitions

consumer thread1

blockingQueue1

0,1,2,3,4,5,6,7,8,9,10,11

fetch thread与partitions分布列表如下

fetch线程

partitions

fetch thread1

0,3,6,9

fetch thread2

1,4,7,10

fetch thread3

2,5,8,11



b. consumer thread count=2时

此时有consumerThread1和consumerThread2分别对应2个队列blockingQueue1,blockingQueue2,这2个消费者线程消费partitions依次为:0,1,2,3,4,5与6,7,8,9,10,11;消费者线程,缓冲队列,partitions分布列表如下

consumer线程

Blocking Queue

partitions

consumer thread1

blockingQueue1

0,1,2,3,4,5

consumer thread2

blockingQueue2

6,7,8,9,10,11


fetch thread与partitions分布列表如下

fetch线程

partitions

fetch thread1

0,3,6,9

fetch thread2

1,4,7,10

fetch thread3

2,5,8,11


c. consumer thread count=4时

消费者线程,缓冲队列,partitions分布列表如下

consumer线程

Blocking Queue

partitions

consumer thread1

blockingQueue1

0,1,2

consumer thread2

blockingQueue2

3,4,5

consumer thread3

blockingQueue3

6,7,8

consumer thread4

blockingQueue4

9,10,11

fetch thread与partitions分布列表如下

同上

同理当消费线程consumer thread count=n,都是安装上述分布规则来处理的。


3.consumer消息线程以及队列创建逻辑

运用ZookeeperConsumerConnector类创建多线程并行消费测试类,ConsumerGroupExample类初始化,调用createMessageStreams方法,实际是在consume方法处理的逻辑,创建KafkaStream,以及阻塞队列(LinkedBlockingQueue),KafkaStream与队列个数一一对应,消费者线程数量决定阻塞队列的个数。

registerConsumerInZK()方法:设置消费者组,注册消费者信息consumerIdString到zookeeper上。

consumerIdStrin*生规则部分代码如下:

    String consumerUuid = null;
if(config.consumerId!=null && config.consumerId)
consumerUuid = consumerId;
else {
String uuid = UUID.randomUUID()

consumerUuid = "%s-%d-%s".format(
InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
uuid.getMostSignificantBits().toHexString.substring(0,8));
}
String consumerIdString = config.groupId + "_" + consumerUuid;

kafka zookeeper注册模型结构或存储结构如下:

kafka在zookeeper中存储结构           

说明:目前把kafka中绝大部分存储模型都列表出来了,当前还有少量不常使用的,暂时还没有列举,后续会加上。


consumer初始化逻辑处理:

1.实例化并注册loadBalancerListener监听,ZKRebalancerListener监听consumerIdString状态变化

 

触发consumer reblance条件如下几个:

ZKRebalancerListener:当/kafka01/consumer/[consumer-group]/ids子节点变化时,会触发

ZKTopicPartitionChangeListener:当该topic的partitions发生变化时,会触发。

      val topicPath = "/kafka01/brokers/topics" + "/" + "topic-1"
      zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)


consumer reblance逻辑


consumer offset更新机制


reblance计算规则:


后续.....