Storm的wordcount实战示例

时间:2023-02-04 20:33:15
有关strom的具体介绍,本文不再过多叙述,不了解的朋友可参考之前的文章 
http://qindongliang.iteye.com/category/361820 
本文主要以一个简单的wordcount例子,来了解下storm应用程序的开发,虽然只是一个简单的例子 
但麻雀虽小,五脏俱全,主要涉及的内容: 

(1)wordcount的拓扑定义 
(2)spout的使用 
(3)bolt的使用 
(4)tick定时器的使用 
(5) bolt之间数据传输的坑 
简单的数据流程图如下: 


Storm的wordcount实战示例


提交到storm集群上的拓扑图: 


Storm的wordcount实战示例



maven项目的pom依赖: 

Java代码   Storm的wordcount实战示例
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"  
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  5.     <modelVersion>4.0.0</modelVersion>  
  6.   
  7.     <groupId>com.jstrom.demo</groupId>  
  8.     <artifactId>jstrom-test</artifactId>  
  9.     <version>1.0-SNAPSHOT</version>  
  10.   
  11.   
  12.   
  13.     <properties>  
  14.   
  15.         <jstorm.version>2.1.1</jstorm.version>  
  16.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
  17.         <slf4j.version>1.7.12</slf4j.version>  
  18.         <joad-time.version>2.9.4</joad-time.version>  
  19.         <storm-kafka.version>0.9.4</storm-kafka.version>  
  20.         <kafka.version>0.9.0.0</kafka.version>  
  21.         <esper.version>5.4.0</esper.version>  
  22.   
  23.   
  24.   
  25.      </properties>  
  26.   
  27.   
  28.   
  29.   
  30.   
  31.   
  32.     <dependencies>  
  33.   
  34.   
  35.         <!-- https://mvnrepository.com/artifact/com.espertech/esper -->  
  36.   
  37.   
  38.   
  39.         <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->  
  40.         <dependency>  
  41.             <groupId>joda-time</groupId>  
  42.             <artifactId>joda-time</artifactId>  
  43.             <version>${joad-time.version}</version>  
  44.         </dependency>  
  45.   
  46.         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->  
  47.         <dependency>  
  48.             <groupId>org.apache.kafka</groupId>  
  49.             <artifactId>kafka_2.11</artifactId>  
  50.             <version>${kafka.version}</version>  
  51.             <scope>provided</scope>  
  52.         </dependency>  
  53.   
  54.   
  55.         <dependency>  
  56.             <groupId>com.alibaba.jstorm</groupId>  
  57.             <artifactId>jstorm-core</artifactId>  
  58.             <version>${jstorm.version}</version>  
  59.             <scope>provided</scope>  
  60.         </dependency>  
  61.         <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->  
  62.         <dependency>  
  63.             <groupId>org.apache.storm</groupId>  
  64.             <artifactId>storm-kafka</artifactId>  
  65.             <version>${storm-kafka.version}</version>  
  66.             <scope>provided</scope>  
  67.         </dependency>  
  68.   
  69.   
  70.   
  71.   
  72.   
  73.         <dependency>  
  74.             <groupId>org.slf4j</groupId>  
  75.             <artifactId>slf4j-jdk14</artifactId>  
  76.             <version>${slf4j.version}</version>  
  77.         </dependency>  
  78.         <dependency>  
  79.             <groupId>org.slf4j</groupId>  
  80.             <artifactId>slf4j-nop</artifactId>  
  81.             <version>${slf4j.version}</version>  
  82.         </dependency>  
  83.   
  84.   
  85.     </dependencies>  
  86.   
  87.     <build>  
  88.         <plugins>  
  89.             <plugin>  
  90.                 <groupId>org.apache.maven.plugins</groupId>  
  91.                 <artifactId>maven-compiler-plugin</artifactId>  
  92.                 <version>2.3.2</version>  
  93.                 <configuration>  
  94.                     <source>1.7</source>  
  95.                     <target>1.7</target>  
  96.                 </configuration>  
  97.             </plugin>  
  98.             <plugin>  
  99.                 <artifactId>maven-assembly-plugin</artifactId>  
  100.                 <configuration>  
  101.                     <archive>  
  102.                         <manifest>  
  103.                             <addClasspath>true</addClasspath>  
  104.                             <mainClass>换成自己的主类</mainClass>  
  105.                         </manifest>  
  106.                     </archive>  
  107.                     <descriptorRefs>  
  108.                         <descriptorRef>jar-with-dependencies</descriptorRef>  
  109.                     </descriptorRefs>  
  110.                 </configuration>  
  111.                 <executions>  
  112.                     <execution>  
  113.                         <id>make-my-jar-with-dependencies</id>  
  114.                         <phase>package</phase>  
  115.                         <goals>  
  116.                             <goal>single</goal>  
  117.                         </goals>  
  118.                     </execution>  
  119.                 </executions>  
  120.             </plugin>  
  121.         </plugins>  
  122.     </build>  
  123.   
  124. </project>  


(1)Topology主拓扑类: 

Java代码   Storm的wordcount实战示例
  1. package com.jstorm.wd;  
  2.   
  3. import backtype.storm.Config;  
  4. import backtype.storm.LocalCluster;  
  5. import backtype.storm.StormSubmitter;  
  6. import backtype.storm.topology.TopologyBuilder;  
  7. import backtype.storm.tuple.Fields;  
  8.   
  9. /** 
  10.  * Created by QinDongLiang on 2016/9/12. 
  11.  */  
  12. public class TopologyWordCount {  
  13.   
  14.     public static void main(String[] args) throws  Exception {  
  15.         TopologyBuilder builder=new TopologyBuilder();  
  16.         //设置数据源  
  17.         builder.setSpout("spout",new CreateSentenceSpout(),1);  
  18.         //读取spout数据源的数据,进行split业务逻辑  
  19.         builder.setBolt("split",new SplitWordBolt(),1).shuffleGrouping("spout");  
  20.         //读取split后的数据,进行count (tick周期10秒)  
  21.         builder.setBolt("count",new SumWordBolt(),1).fieldsGrouping("split",new Fields("word"));  
  22.         //读取count后的数据,进行缓冲打印 (tick周期3秒,仅仅为测试tick使用,所以多加了这个bolt)  
  23.         builder.setBolt("show",new ShowBolt(),1).shuffleGrouping("count");  
  24.         //读取show后缓冲后的数据,进行最终的打印 (实际应用中,最后一个阶段应该为持久层)  
  25.         builder.setBolt("final",new FinalBolt(),1).allGrouping("show");  
  26.   
  27.         Config config=new Config();  
  28.         config.setDebug(false);  
  29.         //集群模式  
  30.         if(args!=null&&args.length>0){  
  31.             config.setNumWorkers(2);  
  32.             StormSubmitter.submitTopology(args[0],config,builder.createTopology());  
  33.         //单机模式  
  34.         }else{  
  35.             config.setMaxTaskParallelism(1);;  
  36.             LocalCluster cluster=new LocalCluster();  
  37.             cluster.submitTopology("word-count",config,builder.createTopology());  
  38.             Thread.sleep(3000000);  
  39.             cluster.shutdown();  
  40.         }  
  41.     }  
  42.   
  43. }  


(2)Spout数据源类 

Java代码   Storm的wordcount实战示例
  1. package com.jstorm.wd;  
  2.   
  3. import backtype.storm.spout.SpoutOutputCollector;  
  4. import backtype.storm.task.TopologyContext;  
  5. import backtype.storm.topology.OutputFieldsDeclarer;  
  6. import backtype.storm.topology.base.BaseRichSpout;  
  7. import backtype.storm.tuple.Fields;  
  8. import backtype.storm.tuple.Values;  
  9. import backtype.storm.utils.Utils;  
  10. import org.joda.time.DateTime;  
  11.   
  12. import java.util.Map;  
  13. import java.util.Random;  
  14.   
  15. /** 
  16.  * Created by QinDongLiang on 2016/8/31. 
  17.  * 创建数据源 
  18.  */  
  19. public class CreateSentenceSpout extends BaseRichSpout {  
  20.     //  
  21.     SpoutOutputCollector collector;  
  22.     Random random;  
  23.     String [] sentences=null;  
  24.   
  25.     @Override  
  26.     public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {  
  27.         this.collector=spoutOutputCollector;//spout_collector  
  28.         random=new Random();//  
  29.         sentences=new String[]{"hadoop hadoop hadoop java java "};  
  30.   
  31.     }  
  32.   
  33.     @Override  
  34.     public void nextTuple() {  
  35.         Utils.sleep(10000);  
  36.         //获取数据  
  37.         String sentence=sentences[random.nextInt(sentences.length)];  
  38.         System.out.println("线程名:"+Thread.currentThread().getName()+"  "+new DateTime().toString("yyyy-MM-dd HH:mm:ss  ")+"10s发射一次数据:"+sentence);  
  39.         //向下游发射数据  
  40.         this.collector.emit(new Values(sentence));  
  41.     }  
  42.   
  43.     @Override  
  44.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {  
  45.         outputFieldsDeclarer.declare(new Fields("sentence"));  
  46.     }  
  47. }  


(3)Split的bolt类 

Java代码   Storm的wordcount实战示例
  1. package com.jstorm.wd;  
  2.   
  3. import backtype.storm.task.OutputCollector;  
  4. import backtype.storm.task.TopologyContext;  
  5. import backtype.storm.topology.OutputFieldsDeclarer;  
  6. import backtype.storm.topology.base.BaseRichBolt;  
  7. import backtype.storm.tuple.Fields;  
  8. import backtype.storm.tuple.Tuple;  
  9. import backtype.storm.tuple.Values;  
  10.   
  11. import java.util.HashMap;  
  12. import java.util.Map;  
  13.   
  14. /** 
  15.  * 简单的按照空格进行切分后,发射到下一阶段bolt 
  16.  * Created by QinDongLiang on 2016/8/31. 
  17.  */  
  18. public class SplitWordBolt extends BaseRichBolt {  
  19.   
  20.     Map<String,Integer> counts=new HashMap<>();  
  21.   
  22.     private OutputCollector outputCollector;  
  23.   
  24.     @Override  
  25.     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {  
  26.         this.outputCollector=outputCollector;  
  27.     }  
  28.   
  29.     @Override  
  30.     public void execute(Tuple tuple) {  
  31.         String sentence=tuple.getString(0);  
  32. //        System.out.println("线程"+Thread.currentThread().getName());  
  33. //        简单的按照空格进行切分后,发射到下一阶段bolt  
  34.        for(String word:sentence.split(" ") ){  
  35.            outputCollector.emit(new Values(word));//发送split  
  36.        }  
  37.   
  38.     }  
  39.   
  40.     @Override  
  41.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {  
  42.         //声明输出的filed  
  43.         outputFieldsDeclarer.declare(new Fields("word"));  
  44.     }  
  45. }  


(4)Sum的bolt类 


Java代码   Storm的wordcount实战示例
  1. package com.jstorm.wd;  
  2.   
  3. import backtype.storm.Config;  
  4. import backtype.storm.task.OutputCollector;  
  5. import backtype.storm.task.TopologyContext;  
  6. import backtype.storm.topology.OutputFieldsDeclarer;  
  7. import backtype.storm.topology.base.BaseRichBolt;  
  8. import backtype.storm.tuple.Fields;  
  9. import backtype.storm.tuple.Tuple;  
  10. import backtype.storm.tuple.Values;  
  11. import backtype.storm.utils.TupleHelpers;  
  12. import backtype.storm.utils.Utils;  
  13. import org.joda.time.DateTime;  
  14. import org.slf4j.Logger;  
  15. import org.slf4j.LoggerFactory;  
  16.   
  17. import java.io.*;  
  18. import java.util.HashMap;  
  19. import java.util.Map;  
  20.   
  21. /** 
  22.  * Created by QinDongLiang on 2016/8/31. 
  23.  */  
  24. public class SumWordBolt extends BaseRichBolt {  
  25.   
  26.     Map<String,Integer> counts=new HashMap<>();  
  27.   
  28.     private OutputCollector outputCollector;  
  29.     final static Logger logger= LoggerFactory.getLogger(SumWordBolt.class);  
  30.     @Override  
  31.     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {  
  32.         this.outputCollector=outputCollector;  
  33.     }  
  34.   
  35.     @Override  
  36.     public Map<String, Object> getComponentConfiguration() {  
  37.         Map<String, Object> conf = new HashMap<String, Object>();  
  38.         conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);//加入Tick时间窗口,进行统计  
  39.         return conf;  
  40.     }  
  41.   
  42.     public static Object deepCopy(Object srcObj) {  
  43.         Object cloneObj = null;  
  44.         try {  
  45.             ByteArrayOutputStream out = new ByteArrayOutputStream();  
  46.             ObjectOutputStream oo = new ObjectOutputStream(out);  
  47.             oo.writeObject(srcObj);  
  48.   
  49.             ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());  
  50.             ObjectInputStream oi = new ObjectInputStream(in);  
  51.             cloneObj = oi.readObject();  
  52.         } catch(IOException e) {  
  53.             e.printStackTrace();  
  54.         } catch(ClassNotFoundException e) {  
  55.             e.printStackTrace();  
  56.         }  
  57.         return cloneObj;  
  58.     }  
  59.   
  60.     @Override  
  61.     public void execute(Tuple tuple) {  
  62.         //时间窗口定义为10s内的统计数据,统计完毕后,发射到下一阶段的bolt进行处理  
  63.         //发射完成后retun结束,开始新一轮的时间窗口计数操作  
  64.         if(TupleHelpers.isTickTuple(tuple)){  
  65.             System.out.println(new DateTime().toString("yyyy-MM-dd HH:mm:ss")+" 每隔10s发射一次map 大小:"+counts.size());  
  66. //            Map<String,Integer> copyMap= (Map<String, Integer>) deepCopy(counts);  
  67.             outputCollector.emit(new Values(counts));//10S发射一次  
  68. //            counts.clear();  
  69.            counts=new HashMap<>();//这个地方,不能执行clear方法,可以再new一个对象,否则下游接受的数据,有可能为空 或者深度copy也行,推荐new  
  70.             return;  
  71.         }  
  72.   
  73.         //如果没到发射时间,就继续统计wordcount  
  74.         System.out.println("线程"+Thread.currentThread().getName()+"  map 缓冲统计中......  map size:"+counts.size());  
  75.         //String word=tuple.getString(0);//如果有多tick,就不用使用这种方式获取tuple里面的数据  
  76.         String word=tuple.getStringByField("word");  
  77.         Integer count=counts.get(word);  
  78.         if(count==null){  
  79.             count=0;  
  80.         }  
  81.          count++;  
  82.          counts.put(word,count);  
  83.   
  84.   
  85. //        System.out.println(word+" =====>  "+count);  
  86.   
  87.   
  88.   
  89.     }  
  90.   
  91.     @Override  
  92.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {  
  93.   
  94.         outputFieldsDeclarer.declare(new Fields("word_map"));  
  95.     }  
  96. }  



(5)Show的bolt类 

Java代码   Storm的wordcount实战示例
  1. /** 
  2.  * Created by QinDongLiang on 2016/9/12. 
  3.  */  
  4. public class ShowBolt extends BaseRichBolt {  
  5.   
  6.   
  7.     private  OutputCollector outputCollector;  
  8.   
  9.     @Override  
  10.     public Map<String, Object> getComponentConfiguration() {  
  11.         Map<String, Object> conf = new HashMap<String, Object>();  
  12.         conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3);//tick时间窗口3秒后,发射到下一阶段的bolt,仅为测试用  
  13.         return conf;  
  14.     }  
  15.   
  16.     @Override  
  17.     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {  
  18.         this.outputCollector=outputCollector;  
  19.     }  
  20.   
  21.     Map<String,Integer> counts=new HashMap<>();  
  22.   
  23.     @Override  
  24.     public void execute(Tuple tuple) {  
  25.  //tick时间窗口3秒后,发射到下一阶段的bolt,仅为测试用,故多加了这个bolt逻辑  
  26.         if(TupleHelpers.isTickTuple(tuple)){  
  27.             System.out.println(new DateTime().toString("yyyy-MM-dd HH:mm:ss")+"  showbolt间隔  应该是 3 秒后 ");  
  28. //        System.out.println("what: "+tuple.getValue(0)+"  "+tuple.getFields().toList());  
  29.             outputCollector.emit(new Values(counts));  
  30.         return;  
  31.         }  
  32.   
  33.         counts= (Map<String, Integer>) tuple.getValueByField("word_map");  
  34.   
  35.   
  36.   
  37.   
  38.     }  
  39.   
  40.     @Override  
  41.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {  
  42.   
  43.          outputFieldsDeclarer.declare(new Fields("final_result"));  
  44.     }  
  45. }  


(6)Final的bolt类 

Java代码   Storm的wordcount实战示例
  1. package com.jstorm.wd;  
  2.   
  3. import backtype.storm.task.OutputCollector;  
  4. import backtype.storm.task.TopologyContext;  
  5. import backtype.storm.topology.OutputFieldsDeclarer;  
  6. import backtype.storm.topology.base.BaseRichBolt;  
  7. import backtype.storm.tuple.Tuple;  
  8. import org.joda.time.DateTime;  
  9.   
  10. import java.util.Map;  
  11.   
  12. /** 
  13.  * Created by QinDongLiang on 2016/9/12. 
  14.  * 最终的结果打印bolt 
  15.  */  
  16. public class FinalBolt extends BaseRichBolt {  
  17.   
  18.     @Override  
  19.     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {  
  20.   
  21.     }  
  22.   
  23.     @Override  
  24.     public void execute(Tuple tuple) {  
  25. //        最终的结果打印bolt  
  26.         System.out.println(new DateTime().toString("yyyy-MM-dd HH:mm:ss")+"  final bolt ");  
  27.         Map<String,Integer> counts= (Map<String, Integer>) tuple.getValue(0);  
  28.         for(Map.Entry<String,Integer> kv:counts.entrySet()){  
  29.             System.out.println(kv.getKey()+"  "+kv.getValue());  
  30.         }  
  31.         //实际应用中,最后一个阶段,大部分应该是持久化到mysql,redis,es,solr或mongodb中  
  32.     }  
  33.   
  34.     @Override  
  35.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {  
  36.   
  37.     }  
  38. }  


有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 
技术债不能欠,健康债更不能欠, 求道之路,与君同行。 

Storm的wordcount实战示例