topology包含:stream、spout、blot。
topology会一直运行,除非进程被杀死。
1、stream
stream=tuple=event(CEP中的)=发送的报文。键值对(一个或多个)。
//spout @Override public void nextTuple() { Utils.sleep(100); String sendStr = "Hello World"; collector.emit(new Values(sendStr)); log.info(sendStr); } /** * 告诉storm集群,spout发送了那些字段 * @param declarer * @return void */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } //blot @Override public void execute(Tuple input) { String test = input.getStringByField("sentence"); if (test == "Hello World") { myCount++; //log.info("发现了1个Hello World!现在的计数值="+Integer.toString(myCount)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("myCount")); }
上例,key=sentence,value=Hello World
2、spout--生产者
抓取数据(连接数据源)转为一个个的tuple,并将tupple发送给blot。
String sendStr = "Hello World";//数据源 collector.emit(new Values(sendStr));//发送给blot
spout不用于业务实现,在多个topology中可以复用。
3、blot--消费者
blot将接受到的tuple,进行计算。可接受多个spout发送的tuple,也可以接受其他blot发送的tuple。如上图。
其可执行的主要功能:
1、过滤tuple 2、join和聚合 3、计算 4、数据库读写