使用zookeeper获取brokers的信息

时间:2021-05-28 08:25:16

使用zookeeper获取brokers的信息

Table of Contents

1 在ArchLinux上安装libzookeeper

yaourt -S libzookeeper

这样会安装最新版本3.4.7-1, 由于 3.4客户端兼容3.3 server,所以应该问题不大。如果不幸,可以下载源代码自己编译。

2 include 头文件

上面的方式会将头文件安装到/usr/include/zookeeper/目录

#include <zookeeper/zookeeper.h>#include <zookeeper/zookeeper.jute.h>

3 现在编写代码

读取zookeeper的例子来源于rdkafkazookeeperexample.c(0.8 branch), 为了和C++代码一起使用做了修改。 这是main函数的部份代码。

int main (int argc, char **argv) {  /*   * Process kill signal, quit from the loop   */  signal(SIGINT, sigterm);  signal(SIGTERM, sigterm);  /*   * Create configuration objects   */  RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);  RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);  /** Initialize zookeeper */  string zookeeper("localhost:2181");  zhandle_t * zh = initialize_zookeeper(zookeeper);  /*   * Set configuration properties   */  char brokers[1024];  set_brokerlist_from_zookeeper(zh, brokers);  string errstr;  cout << "brokers from zookeeper is: " << brokers << endl;  global_conf->set("metadata.broker.list", brokers, errstr);

几个C函数代码实现

#include "librdkafka/rdkafka.h"  /* for Kafka driver */#include <zookeeper/zookeeper.h>#include <zookeeper/zookeeper.jute.h>#include <jansson.h>using std::string;using std::list;using std::cout;using std::endl;static bool run = true;static bool exit_eof = true;static rd_kafka_t *rk;#define BROKER_PATH "/brokers/ids"static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) {  if (zzh) {    struct String_vector brokerlist;    if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) {      fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH);      return;    }    int i;    char *brokerptr = brokers;    for (i = 0; i < brokerlist.count; i++) {      char path[255], cfg[1024];      sprintf(path, "/brokers/ids/%s", brokerlist.data[i]);      int len = sizeof(cfg);      zoo_get(zzh, path, 0, cfg, &len, NULL);      if (len > 0) {        cfg[len] = '\0';        json_error_t jerror;        json_t *jobj = json_loads(cfg, 0, &jerror);        if (jobj) {          json_t *jhost = json_object_get(jobj, "host");          json_t *jport = json_object_get(jobj, "port");          if (jhost && jport) {            const char *host = json_string_value(jhost);            const int   port = json_integer_value(jport);            sprintf(brokerptr, "%s:%d", host, port);            brokerptr += strlen(brokerptr);            if (i < brokerlist.count - 1) {              *brokerptr++ = ',';            }          }          json_decref(jobj);        }      }    }    deallocate_String_vector(&brokerlist);    printf("Found brokers %s\n", brokers);  }}static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx){  char brokers[1024];  if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)    {      brokers[0] = '\0';      set_brokerlist_from_zookeeper(zh, brokers);      if (brokers[0] != '\0' && rk != NULL)        {          rd_kafka_brokers_add(rk, brokers);          rd_kafka_poll(rk, 10);        }    }}static zhandle_t* initialize_zookeeper(string const& zookeeper) {  zhandle_t * zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, 0, 0);  if (zh == NULL) {    fprintf(stderr, "Zookeeper connection not established.");    exit(1);  }  return zh;}

4 运行程序

可以看到打印出从zookeeper中读取到的broker list, broker之间用,隔开

brokers from zookeeper is: localhost.localdomain:9093,localhost.localdomain:9094

5 后续要解决

读取topic的partition等信息,能够自动获取所有partition的消息。

Created: 2016-05-02 Mon 11:04

Validate