从kafka和zookeeper中获取生产和消费偏移量

时间:2024-09-30 16:20:30

从kafka和zookeeper中获取生产和消费偏移量

  • 特殊说明

    • 该命令是使用python进行编译,需要使用centos7系统上进行使用。
  • 命令详细

[root@mongodb_1 get_offsets_num]# ./get_offsets_num -h
usage: get_offsets_num [-h] [-k KAFKA_HOST] [-z ZOOKEEPER_HOST]
                       [-m INTERVAL_MINUTES]

Usage of argparse

optional arguments:
  -h, --help            show this help message and exit
  -k KAFKA_HOST, --kafka_host KAFKA_HOST
                        需要输入kafka:端口
  -z ZOOKEEPER_HOST, --zookeeper_host ZOOKEEPER_HOST
                        需要输入zookeeper:端口
  -m INTERVAL_MINUTES, --Interval_minutes INTERVAL_MINUTES
                        间隔分钟
  • 命令执行
[root@mongodb_1 get_offsets_num]# ./get_offsets_num_v2.py  -k 10.130.25.77:9092 -z 10.130.25.79:2181  
Interval 1 minutes sleep
=======================================================================================
kafka offsets: agent 2574552 2574552
zookeeper offsets: agent 2574552 2574552
agent kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
kafka offsets: record 89110 89110
zookeeper offsets: record 89110 89110
record kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
  • 代码详情
#!/usr/local/python3/bin/python3
import os, time,json,argparse
from kazoo.client import KazooClient
from kafka3 import KafkaConsumer, TopicPartition


def get_zoo_consumer_info(Topology):
    Topology_num = 0
    zk_cli.start()
    path = "/stormOffset/" + Topology + "/partition_0"
    if zk_cli.exists(path):
        str_data, stat = zk_cli.get(path)
        str_data = json.loads(str_data)
        Topology_num =  str_data.get("offset")
        #print("zookeeper now " + path + " offsets: " + str(Topology_num) )
    else:   
        print("Path " + path  + " does not exist.")
    return Topology_num

def get_kafka_consumer_info(server, topic):
    partition = 0
    tp = TopicPartition(topic, partition)
    end_offset = server.end_offsets([tp])[tp]
    #print("kafka topic " + topic + " partition " + str(partition) + " offsets: " + str(end_offset))
    return end_offset


if  __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Usage of argparse')
    parser.add_argument('-k','--kafka_host', type=str, default="10.130.25.77:9092",help='需要输入kafka:端口')
    parser.add_argument('-z','--zookeeper_host', type=str, default="10.130.25.79:2181",help='需要输入zookeeper:端口')
    parser.add_argument('-m','--Interval_minutes', type=int, default="1",help='间隔分钟')
    
    args = parser.parse_args()
    kafka_host= args.kafka_host
    zookeer_host= args.zookeeper_host

    Kafka_production_topics = "agent,record"
    Zoo_consumption_topics= "agentTopology,recordTopology"
    Interval_minutes = args.Interval_minutes

    try:
        zk_cli = KazooClient(hosts=zookeer_host)
        #print("init zookeeper " + zookeer_host + " conn ok")
    except Exception as e:
        print("init zookeeper conn error: "+ str(e))

    try:
        #kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)
        kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)
        #print("init kafka " + kafka_host + "  conn ok")
    except Exception as e:
        print("init kafka conn error: "+ str(e))


    zoo_offset = {}
    kafka_offset = {}
    Kafka_production_topics_list = Kafka_production_topics.split(",")
    Kafka_production_topics_list_2  =  Kafka_production_topics.split(",")
    Zoo_consumption_topics_list = Zoo_consumption_topics.split(",")
    Zoo_consumption_topics_list_2 =   Zoo_consumption_topics.split(",")
    for i in range(0,len(Kafka_production_topics_list)):
        kafka_topics = Kafka_production_topics_list.pop()
        get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)
        kafka_offset[kafka_topics]=get_kafka_offset_num
        zoo_topics = Zoo_consumption_topics_list.pop()
        get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)
        zoo_offset[zoo_topics]= get_zoo_offset_num
    print("Interval " + str(Interval_minutes) + " minutes sleep")
    print("=======================================================================================")
    time.sleep(int(Interval_minutes) * 60)
    
    for i in range(0,len(Kafka_production_topics_list_2)):
        kafka_topics = Kafka_production_topics_list_2.pop()
        get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)
        last_kafka_num = kafka_offset.get(kafka_topics)
        minutes_kafka_offset_num = get_kafka_offset_num - last_kafka_num
        zoo_topics = Zoo_consumption_topics_list_2.pop()
        get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)
        last_zoo_num =  zoo_offset.get(zoo_topics)
        minutes_zoo_offset_num = get_zoo_offset_num - last_zoo_num
        Difference = minutes_kafka_offset_num - minutes_zoo_offset_num
        print("kafka offsets:",kafka_topics,get_kafka_offset_num,last_kafka_num)
        print("zookeeper offsets:",kafka_topics,get_zoo_offset_num,last_zoo_num)
        print(kafka_topics  + " kafka offsets num: " + str(minutes_kafka_offset_num) + " storm offsets num: " + str(minutes_zoo_offset_num) + " Actual consumption: " + str(Difference))
        print("=======================================================================================")
    zk_cli.stop()
    # 关闭消费者连接
    kafka_server.close()