storm是流式计算框架,而kafka是一个消息队列,在生产环境中两者经常配合使用,kafka缓存消息,storm可以从kafka中读取消息。因为流式消息的产生可能是不均匀的,经过kafka缓存之后,可以将不均匀的消息流变为均匀的传给storm用于计算。
下面的代码实现了将kafka的“test”topic产生的消息传给storm,然后storm将输出导入kafka的“test2”topic,因为是示例,所以在storm中没有做任何处理,实际场景中数据会在storm被处理。
WordCountTopology.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.*;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.*;
public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) throws Exception {
BrokerHosts brokerHosts = new ZkHosts("localhost:2181","/kafka/brokers");
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "test", "/kafka", "word-count-topology");
Config conf = new Config();
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
conf.put("kafka.broker.properties", properties);
conf.put("bootstrap.servers", "localhost:9092");
conf.put("topic", "test2");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConfig));
builder.setBolt(SPLIT_BOLT_ID, new TopicMsgBolt()).shuffleGrouping("kafka-reader");
KafkaBolt kafkaBolt = new KafkaBolt();
kafkaBolt.withProducerProperties(properties);
builder.setBolt("msgKafkaBolt", kafkaBolt).shuffleGrouping(SPLIT_BOLT_ID);
StormSubmitter.submitTopology("ttt", conf, builder.createTopology());
Thread.sleep(1000);
}
}
TopicMsgBolt.java
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class TopicMsgBolt extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String word = (String) input.getValue(0);
String out = "Message got is '" + word + "'!";
collector.emit(new Values(out));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
}
本来我是想将“kafka-reader”kafkaSpout产生的消息直接发给“msgKafkaBolt”kafkaBolt,但是会报一个“java.lang.IllegalArgumentException: message does not exist”的异常,必须在两者之间加一个bolt,且该bolt的declareOutputFields需要声明“message”field,待研究。。