使用zookeeper获取brokers的信息
Table of Contents
1 在ArchLinux上安装libzookeeper
yaourt -S libzookeeper
这样会安装最新版本3.4.7-1, 由于 3.4客户端兼容3.3 server,所以应该问题不大。如果不幸,可以下载源代码自己编译。
2 include 头文件
上面的方式会将头文件安装到/usr/include/zookeeper/目录
#include <zookeeper/zookeeper.h>#include <zookeeper/zookeeper.jute.h>
3 现在编写代码
读取zookeeper的例子来源于rdkafkazookeeperexample.c(0.8 branch), 为了和C++代码一起使用做了修改。 这是main函数的部份代码。
int main (int argc, char **argv) { /* * Process kill signal, quit from the loop */ signal(SIGINT, sigterm); signal(SIGTERM, sigterm); /* * Create configuration objects */ RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); /** Initialize zookeeper */ string zookeeper("localhost:2181"); zhandle_t * zh = initialize_zookeeper(zookeeper); /* * Set configuration properties */ char brokers[1024]; set_brokerlist_from_zookeeper(zh, brokers); string errstr; cout << "brokers from zookeeper is: " << brokers << endl; global_conf->set("metadata.broker.list", brokers, errstr);
几个C函数代码实现
#include "librdkafka/rdkafka.h" /* for Kafka driver */#include <zookeeper/zookeeper.h>#include <zookeeper/zookeeper.jute.h>#include <jansson.h>using std::string;using std::list;using std::cout;using std::endl;static bool run = true;static bool exit_eof = true;static rd_kafka_t *rk;#define BROKER_PATH "/brokers/ids"static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) { if (zzh) { struct String_vector brokerlist; if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) { fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH); return; } int i; char *brokerptr = brokers; for (i = 0; i < brokerlist.count; i++) { char path[255], cfg[1024]; sprintf(path, "/brokers/ids/%s", brokerlist.data[i]); int len = sizeof(cfg); zoo_get(zzh, path, 0, cfg, &len, NULL); if (len > 0) { cfg[len] = '\0'; json_error_t jerror; json_t *jobj = json_loads(cfg, 0, &jerror); if (jobj) { json_t *jhost = json_object_get(jobj, "host"); json_t *jport = json_object_get(jobj, "port"); if (jhost && jport) { const char *host = json_string_value(jhost); const int port = json_integer_value(jport); sprintf(brokerptr, "%s:%d", host, port); brokerptr += strlen(brokerptr); if (i < brokerlist.count - 1) { *brokerptr++ = ','; } } json_decref(jobj); } } } deallocate_String_vector(&brokerlist); printf("Found brokers %s\n", brokers); }}static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx){ char brokers[1024]; if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0) { brokers[0] = '\0'; set_brokerlist_from_zookeeper(zh, brokers); if (brokers[0] != '\0' && rk != NULL) { rd_kafka_brokers_add(rk, brokers); rd_kafka_poll(rk, 10); } }}static zhandle_t* initialize_zookeeper(string const& zookeeper) { zhandle_t * zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, 0, 0); if (zh == NULL) { fprintf(stderr, "Zookeeper connection not established."); exit(1); } return zh;}
4 运行程序
可以看到打印出从zookeeper中读取到的broker list, broker之间用,隔开
brokers from zookeeper is: localhost.localdomain:9093,localhost.localdomain:9094
5 后续要解决
读取topic的partition等信息,能够自动获取所有partition的消息。
Created: 2016-05-02 Mon 11:04