kakfa在zookeeper中的节点结构

时间:2022-12-31 13:46:48
kakfa在zookeeper中的节点结构
  • topic注册信息

      /brokers/topics/[topic] : 存储某个topic的partitions所有分配信息

  
  
  1. Schema:
  2. {
  3. "version": "版本编号目前固定为数字1",
  4. "partitions": { "partitionId编号": [ 同步副本组brokerId列表 ], "partitionId编号": [ 同步副本组brokerId列表 ], ....... }}
  5. Example:
  6. {
  7. "version": 1,
  8. "partitions": {"0": [1, 2],"1": [2, 1],"2": [1, 2],}
  9. }
  10. 说明:紫红色为patitions编号,蓝色为同步副本组brokerId列表
  • partition状态信息

   /brokers/topics/[topic]/partitions/[0...N]  其中[0..N]表示partition索引号

  /brokers/topics/[topic]/partitions/[partitionId]/state

  
  
  1. Schema:
  2. {"controller_epoch": 表示kafka集群中的*控制器选举次数,"leader": 表示该partition选举leaderbrokerId,"version": 版本编号默认为1,"leader_epoch": partition leader选举次数,"isr": [同步副本组brokerId列表]}
  3. Example:
  4. {
  5. "controller_epoch": 1,
  6. "leader": 2,
  7. "version": 1,
  8. "leader_epoch": 0,
  9. "isr": [2, 1]
  10. }

  • Broker注册信息

   /brokers/ids/[0...N]                 

  每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),此节点为临时znode(EPHEMERAL)

  
  
  1. Schema:
  2. {
  3. "jmx_port": jmx端口号,
  4. "timestamp": kafka broker初始启动时的时间戳,
  5. "host": 主机名或ip地址,
  6. "version": 版本编号默认为1,
  7. "port": kafka broker的服务端端口号,由server.properties中参数port确定
  8. }
  9. Example:
  10. {
  11. "jmx_port": 6061,
  12. "timestamp":"1403061899859"
  13. "version": 1,
  14. "host": "192.168.1.148",
  15. "port": 9092
  16. }

  •  Controller epoch

   /controller_epoch -> int (epoch)   

此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller*控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1; 

  •  Controller注册信息

/controller -> int (broker id of the controller)  存储center controller*控制器所在kafka broker的信息

  
  
  1. {
  2. "version": 版本编号默认为1,"brokerid": kafka集群中broker唯一编号,"timestamp": kafka broker*控制器变更时的时间戳
  3. }
  4. Example:
  5. {
  6. "version": 1,
  7. "brokerid": 3,
  8. "timestamp": "1403061802981"
  9. }

  • Consumer注册信息

每个consumer都有一个唯一的ID(consumerId可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.

/consumers/[groupId]/ids/[consumerIdString]

是一个临时的znode,此节点的值为请看consumerIdString产生规则,即表示此consumer目前所消费的topic + partitions列表.

consumerId产生规则:

  
  
  1. StringconsumerUuid = null;
  2. if(config.consumerId!=null && config.consumerId)
  3. consumerUuid = consumerId;
  4. else
  5. {
  6. String uuid = UUID.randomUUID()
  7. consumerUuid = "%s-%d-%s".format(InetAddress.getLocalHost.getHostName, System.currentTimeMillis, uuid.getMostSignificantBits().toHexString.substring(0,8));
  8. }
  9. String consumerIdString = config.groupId + "_" + consumerUuid;
   
   
  1. Schema:
  2. {"version": 版本编号默认为1,
  3. "subscription": { //订阅topic列表
  4. "topic名称": consumertopic消费者线程数
  5. },
  6. "pattern": "static",
  7. "timestamp": "consumer启动时的时间戳"
  8. }
  9. Example:
  10. {
  11. "version": 1,
  12. "subscription": {"open_platform_opt_push_plus1": 5},
  13. "pattern": "static",
  14. "timestamp": "1411294187842"
  15. }

  • Consumer owner

   /consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引编号

   当consumer启动时,所触发的操作:

    • 首先进行"Consumer Id注册";
    • 然后在"Consumer id 注册"节点下注册一个watch用来监听当前group中其他consumer的"退出"和"加入";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions)
    • 在"Broker id 注册"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.
  • Consumer offset

   /consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)

  用来跟踪每个consumer目前所消费的partition中最大的offset

  此znode为持久节点,可以看出offset跟group_id有关,以表明当消费者组(consumer group)中一个消费者失效,  重新触发   balance,其他consumer可以继续消费.

  • Re-assign partitions

  /admin/reassign_partitions

  
  
  1. {
  2. "fields":[
  3. {
  4. "name":"version",
  5. "type":"int",
  6. "doc":"version id"
  7. },
  8. {
  9. "name":"partitions",
  10. "type":{
  11. "type":"array",
  12. "items":{
  13. "fields":[
  14. {
  15. "name":"topic",
  16. "type":"string",
  17. "doc":"topic of the partition to be reassigned"
  18. },
  19. {
  20. "name":"partition",
  21. "type":"int",
  22. "doc":"the partition to be reassigned"
  23. },
  24. {
  25. "name":"replicas",
  26. "type":"array",
  27. "items":"int",
  28. "doc":"a list of replica ids"
  29. }
  30. ],
  31. }
  32. "doc":"an array of partitions to be reassigned to new replicas"
  33. }
  34. }
  35. ]
  36. }
  37. Example:
  38. {
  39. "version": 1,
  40. "partitions":
  41. [
  42. {
  43. "topic": "Foo",
  44. "partition": 1,
  45. "replicas": [0, 1, 3]
  46. }
  47. ]
  48. }

  •  Preferred replication election

   /admin/preferred_replica_election

 

  
  
  1. {
  2. "fields":[
  3. {
  4. "name":"version",
  5. "type":"int",
  6. "doc":"version id"
  7. },
  8. {
  9. "name":"partitions",
  10. "type":{
  11. "type":"array",
  12. "items":{
  13. "fields":[
  14. {
  15. "name":"topic",
  16. "type":"string",
  17. "doc":"topic of the partition for which preferred replica election should be triggered"
  18. },
  19. {
  20. "name":"partition",
  21. "type":"int",
  22. "doc":"the partition for which preferred replica election should be triggered"
  23. }
  24. ],
  25. }
  26. "doc":"an array of partitions for which preferred replica election should be triggered"
  27. }
  28. }
  29. ]
  30. }
  31. 例子:
  32. {
  33. "version": 1,
  34. "partitions":
  35. [
  36. {
  37. "topic": "Foo",
  38. "partition": 1
  39. },
  40. {
  41. "topic": "Bar",
  42. "partition": 0
  43. }
  44. ]
  45. }

  • 删除topics

  /admin/delete_topics

  
  
  1. Schema:
  2. { "fields":
  3. [ {"name": "version", "type": "int", "doc": "version id"},
  4. {"name": "topics",
  5. "type": { "type": "array", "items": "string", "doc": "an array of topics to be deleted"}
  6. } ]
  7. }
  8. 例子:
  9. {
  10. "version": 1,
  11. "topics": ["foo", "bar"]
  12. }

  • Topic配置

  /config/topics/[topic_name]

  
  
  1. {
  2. "version": 1,
  3. "config": {
  4. "config.a": "x",
  5. "config.b": "y",
  6. ...
  7. }
  8. }