Storm之路-WordCount-实例

时间:2023-01-22 16:19:10

初学storm,有不足的地方还请纠正。

网上看了很多wordcount实例,发现都不是我想要的。

实现场景:统计shengjing.txt词频到集合,一次打印结果。

● 消息源Spout
  继承BaseRichSpout类 / 实现IRichSpout接口
    open,初始化动作;
    nextTuple,消息接入,执行数据发射;
    ack,tuple成功处理后调用;
    fail,tuple处理失败后调用;
    declareOutputFields,声明输出字段;

● 处理单元Bolt
  继承BaseBasicBolt类 / BaseWindowedBolt / 实现IRichBolt接口
    prepare,worker启动时初始化;
    execute,接受一个tuple / tupleWindow并执行逻辑处理,发射出去;
    cleanup,关闭前调用;
    declareOutputFiedls,字段申明;

● 项目结构

Storm之路-WordCount-实例

● pom.xml文件,配置项目jar依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.scps.storm</groupId>
<artifactId>storm-example</artifactId>
<version>0.0.1</version>
<name>storm.example</name>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
</project>

● WordTopology.java文件,入口类,实例Topology、Spout、Bolt,配置等

 package com.scps.storm.helloword;

 import java.util.concurrent.TimeUnit;

 import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt.Duration;
import org.apache.storm.tuple.Fields; import com.scps.storm.helloword.bolt.SlidingWindowBolt;
import com.scps.storm.helloword.bolt.WordCountBolt;
import com.scps.storm.helloword.bolt.WordFinalBolt;
import com.scps.storm.helloword.bolt.WordSplitBolt;
import com.scps.storm.helloword.spout.WordReaderSpout; public class WordTopology { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); // 1个task去读文件
builder.setSpout("word-reader", new WordReaderSpout(), 1); // 2个task分割行
builder.setBolt("word-split", new WordSplitBolt(), 2).shuffleGrouping("word-reader"); // 2个task分批统计,并发送相同的word到同一个task
builder.setBolt("word-count", new WordCountBolt(), 2).fieldsGrouping("word-split", new Fields("word")); // 1个task汇总,每隔3秒统计最近5秒的tuple,SlidingWindow滑动窗口(间隔)
// builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count");
// 1个task汇总,统计5秒内的tuple,不能超过15秒?提示超时错误,TumblingWindow滚动窗口
builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withTumblingWindow(new Duration(5, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count"); // 1个task输出
builder.setBolt("word-final", new WordFinalBolt(), 1).shuffleGrouping("sliding-window-bolt"); Config conf = new Config(); conf.setDebug(false); if (args != null && args.length > 0) { // 在集群运行,需要mvn package编译
// bin/storm jar "/root/storm-example-0.0.1.jar" com.scps.storm.helloword.WordTopology "http://nimbus:8080/uploads/shengjing.txt" wordcount try { String file = args[0];
String name = args[1]; conf.put("file", file);
// conf.setNumWorkers(2); StormSubmitter.submitTopology(name, conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace();
} } else { // 直接在eclipse中运行 conf.put("file", "C:\\Users\\Administrator\\Downloads\\shengjing1.txt");
// conf.put("file", "http://192.168.100.170:8080/uploads/shengjing.txt");
// conf.setMaxTaskParallelism(2); // 设置最大task数
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordcount", conf, builder.createTopology());
}
}
}

● WordReaderSpout.java文件,读取txt文件,发送行

 package com.scps.storm.helloword.spout;

 import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils; public class WordReaderSpout implements IRichSpout { private static final long serialVersionUID = 1L;
private SpoutOutputCollector outputCollector;
private String filePath;
private boolean completed = false; public void ack(Object arg0) { } public void activate() { } public void close() { } public void deactivate() { } public void fail(Object arg0) { } @SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { filePath = conf.get("file").toString();
outputCollector = collector;
} public void nextTuple() { if (!completed) { String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
System.out.println("WordReaderSpout nextTuple, " + time); String line = "";
InputStream inputStream = null;
InputStreamReader inputStreamReader = null;
BufferedReader reader = null; try { // filePath = "http://192.168.100.170:8080/uploads/shengjing.txt";
// filePath = "C:\\Users\\Administrator\\Downloads\\shengjing.txt"; if (filePath.startsWith("http://")) { // 远程文件
URL url = new URL(filePath);
URLConnection urlConn = url.openConnection();
inputStream = urlConn.getInputStream();
} else { // 本地文件
inputStream = new FileInputStream(filePath);
} inputStreamReader = new InputStreamReader(inputStream, "utf-8");
reader = new BufferedReader(inputStreamReader);
while ((line = reader.readLine()) != null) {
outputCollector.emit(new Values(line));
} } catch (MalformedURLException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
completed = true;
try {
if (reader != null) {
reader.close();
}
if (inputStreamReader != null) {
inputStreamReader.close();
}
if (inputStream != null) {
inputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
} Utils.sleep(20000);
} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line"));
} public Map<String, Object> getComponentConfiguration() { return null;
}
}

使用集群测试时,先把txt文件上传到nimbus的ui里,随机指派supervisor远程读取文件。

● WordSplitBolt.java文件,接收行,分割行,发送词

 package com.scps.storm.helloword.bolt;

 import java.util.Map;

 import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values; public class WordSplitBolt implements IRichBolt { private static final long serialVersionUID = 1L;
private OutputCollector outputCollector; @SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { outputCollector = collector;
} public void execute(Tuple input) { String line = input.getStringByField("line"); line = line.trim();
line = line.replace(",", " ");
line = line.replace(".", " ");
line = line.replace(":", " ");
line = line.replace(";", " ");
line = line.replace("?", " ");
line = line.replace("!", " ");
line = line.replace("(", " ");
line = line.replace(")", " ");
line = line.replace("[", " ");
line = line.replace("]", " ");
line = line.trim(); String[] words = line.split(" ");
for (String word : words) {
word = word.trim();
if (!"".equals(word)) {
outputCollector.emit(new Values(word));
}
}
} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word"));
} public void cleanup() { } public Map<String, Object> getComponentConfiguration() { return null;
}
}

● WordCountBolt.java文件,接收词,统计词,发送集合

 package com.scps.storm.helloword.bolt;

 import java.util.HashMap;
import java.util.Map; import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values; public class WordCountBolt implements IRichBolt { private static final long serialVersionUID = 1L;
Map<String, Integer> counter;
private OutputCollector outputCollector; @SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { counter = new HashMap<String, Integer>();
outputCollector = collector;
} public void execute(Tuple input) { String word = input.getStringByField("word");
int count; if (!counter.containsKey(word)) {
count = 1;
} else {
count = counter.get(word) + 1;
} counter.put(word, count);
outputCollector.emit(new Values(word, count));
} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count"));
} public void cleanup() { } public Map<String, Object> getComponentConfiguration() { return null;
}
}

● SlidingWindowBolt.java文件,接收集合,合并集合,发送集合

 package com.scps.storm.helloword.bolt;

 import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map; import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow; public class SlidingWindowBolt extends BaseWindowedBolt { private static final long serialVersionUID = 1L;
Map<String, Integer> counter;
private OutputCollector outputCollector; @SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { counter = new HashMap<String, Integer>();
outputCollector = collector;
} public void execute(TupleWindow inputWindow) { String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
System.out.println("SlidingWindowBolt execute, " + time); for (Tuple input : inputWindow.get()) { String word = input.getStringByField("word");
int count = input.getIntegerByField("count"); counter.put(word, count);
} outputCollector.emit(new Values(counter));
} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("counter"));
}
}

● WordFinalBolt.java文件,接收集合,打印集合

 package com.scps.storm.helloword.bolt;

 import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map; import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple; public class WordFinalBolt implements IRichBolt { private static final long serialVersionUID = 1L; @SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { } @SuppressWarnings("unchecked")
public void execute(Tuple input) { Map<String, Integer> counter = (Map<String, Integer>) input.getValueByField("counter");
List<String> keys = new ArrayList<String>();
keys.addAll(counter.keySet());
Collections.sort(keys);
String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
System.out.println("-----------------begin------------------, " + time);
for (String key : keys) {
System.out.println(key + " : " + counter.get(key));
}
System.out.println("-----------------end--------------------, " + time);
} public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { } public Map<String, Object> getComponentConfiguration() { return null;
}
}

● 项目源码文件地址:https://pan.baidu.com/s/1mhZtvq4 密码:ypbc

Storm之路-WordCount-实例的更多相关文章

  1. Hadoop3 在eclipse中访问hadoop并运行WordCount实例

    前言:       毕业两年了,之前的工作一直没有接触过大数据的东西,对hadoop等比较陌生,所以最近开始学习了.对于我这样第一次学的人,过程还是充满了很多疑惑和不解的,不过我采取的策略是还是先让环 ...

  2. Storm系列(二):使用Csharp创建你的第一个Storm拓扑(wordcount)

    WordCount在大数据领域就像学习一门语言时的hello world,得益于Storm的开源以及Storm.Net.Adapter,现在我们也可以像Java或Python一样,使用Csharp创建 ...

  3. hadoop运行wordcount实例,hdfs简单操作

    1.查看hadoop版本 [hadoop@ltt1 sbin]$ hadoop version Hadoop -cdh5.12.0 Subversion http://github.com/cloud ...

  4. hadoop2&period;6&period;5运行wordcount实例

    运行wordcount实例 在/tmp目录下生成两个文本文件,上面随便写两个单词. cd /tmp/ mkdir file cd file/ echo "Hello world" ...

  5. 执行hadoop自带的WordCount实例

    hadoop 自带的WordCount实例可以统计一批文本文件中各单词出现的次数.下面介绍如何执行WordCount实例. 1.启动hadoop [root@hadoop ~]# start-all. ...

  6. Python实现MapReduce&comma;wordcount实例,MapReduce实现两表的Join

    Python实现MapReduce 下面使用mapreduce模式实现了一个简单的统计日志中单词出现次数的程序: from functools import reduce from multiproc ...

  7. wordcount实例

    scala的wordcount实例 package com.wondersgroup.myscala import scala.actors.{Actor, Future} import scala. ...

  8. Spark源码编译并在YARN上运行WordCount实例

    在学习一门新语言时,想必我们都是"Hello World"程序开始,类似地,分布式计算框架的一个典型实例就是WordCount程序,接触过Hadoop的人肯定都知道用MapRedu ...

  9. Storm手写WordCount

    建立一个maven项目,在pom.xml中进行如下配置: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:x ...

随机推荐

  1. 总结eclipse中安装maven插件

    当自己越来越多的接触到开源项目时,发现大多数的开源项目都是用maven来够建的.并且在开发应用时,也越来越意识到maven的确会解决很多问题,如果你要了解maven,可以参考:Maven入门指南(一) ...

  2. Citect&colon;How do I translate Citect error messages&quest;

    http://www.opcsupport.com/link/portal/4164/4590/ArticleFolder/51/Citect   To decode the error messag ...

  3. &lbrack;TypeScript&rsqb; Using Lodash in TypeScript with Typings and SystemJS

    One of the most confusing parts of getting started with TypeScript is figuring out how to use all th ...

  4. HDU4549 M斐波那契数

    M斐波那契数列 题目分析: M斐波那契数列F[n]是一种整数数列,它的定义例如以下: F[0] = a F[1] = b F[n] = F[n-1] * F[n-2] ( n > 1 ) 如今给 ...

  5. Layer组件多个iframe弹出层打开与关闭及参数传递

    一.Layer简介 Layer是一款近年来备受青睐的web弹层组件,基于jquery,易用.实用,兼容包括IE6在内的所有主流浏览器,拥有丰富强大的可自定义的功能. Layer官网地址:http:// ...

  6. 【转】javascript笔记之apply、call、bind用法

    原文地址:https://www.cnblogs.com/coco1s/p/4833199.html apply.call 在 javascript 中,call 和 apply 都是为了改变某个函数 ...

  7. redis的配置文件解释

    redis的守护进行 守护进程(Daemon Process),也就是通常说的 Daemon 进程(精灵进程),是 Linux 中的后台服务进程.它是一个生存期较长的进程,通常独立 于控制终端并且周期 ...

  8. 对象何时进入老年代、何时发生full gc

    一.对象何时进入老年代 (1)当对象首次创建时, 会放在新生代的eden区, 若没有GC的介入,会一直在eden区, GC后,是可能进入survivor区或者年老代 (2)当对象年龄达到一定的大小 , ...

  9. UA池和代理池

    scrapy下载中间件 UA池 代理池 一.下载中间件 先祭出框架图: 下载中间件(Downloader Middlewares) 位于scrapy引擎和下载器之间的一层组件. - 作用: (1)引擎 ...

  10. 第三课 操作系统开发之x86模拟环境搭建

    前面我们讲解了主引导程序的加载过程,并且制作了虚拟软盘a.img,最终这个主引导程序也在机器中成功运行了,但是实际开发的时候,并不会如此简单,免不了调试过程,如果还像上一节中直接将软盘放到机器中去加载 ...