写了一个storm集成kfaka的程序,kafkaSpout消费的数据作为storm的数据源。运行报错如下:
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[storm-kafka-0.9.5.jar:0.9.5]at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) ~[storm-kafka-0.9.5.jar:0.9.5]
at backtype.storm.daemon.executor$fn__4249$fn__4264.invoke(executor.clj:522) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]
Caused by: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions
at storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:94) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:65) ~[storm-kafka-0.9.5.jar:0.9.5]
... 7 common frames omitted
原因:
有代码如下:
public static StormTopology buildTopo() {
BrokerHosts brokerHosts = new ZkHosts("hdp01:2181");
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/kafka", "kafkaspout");
spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(spoutConfig));
在zookeeper的根目录下没有必要的节点/kafka
解决方法:
在zookeeper的根目录下创建节点/kafka