初学storm,有不足的地方还请纠正。
网上看了很多wordcount实例,发现都不是我想要的。
实现场景:统计shengjing.txt词频到集合,一次打印结果。
● 消息源Spout
继承BaseRichSpout类 / 实现IRichSpout接口
open,初始化动作;
nextTuple,消息接入,执行数据发射;
ack,tuple成功处理后调用;
fail,tuple处理失败后调用;
declareOutputFields,声明输出字段;
● 处理单元Bolt
继承BaseBasicBolt类 / BaseWindowedBolt / 实现IRichBolt接口
prepare,worker启动时初始化;
execute,接受一个tuple / tupleWindow并执行逻辑处理,发射出去;
cleanup,关闭前调用;
declareOutputFiedls,字段申明;
● 项目结构
● pom.xml文件,配置项目jar依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.scps.storm</groupId>
<artifactId>storm-example</artifactId>
<version>0.0.1</version>
<name>storm.example</name>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
</project>
● WordTopology.java文件,入口类,实例Topology、Spout、Bolt,配置等
package com.scps.storm.helloword; import java.util.concurrent.TimeUnit; import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt.Duration;
import org.apache.storm.tuple.Fields; import com.scps.storm.helloword.bolt.SlidingWindowBolt;
import com.scps.storm.helloword.bolt.WordCountBolt;
import com.scps.storm.helloword.bolt.WordFinalBolt;
import com.scps.storm.helloword.bolt.WordSplitBolt;
import com.scps.storm.helloword.spout.WordReaderSpout; public class WordTopology { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); // 1个task去读文件
builder.setSpout("word-reader", new WordReaderSpout(), 1); // 2个task分割行
builder.setBolt("word-split", new WordSplitBolt(), 2).shuffleGrouping("word-reader"); // 2个task分批统计,并发送相同的word到同一个task
builder.setBolt("word-count", new WordCountBolt(), 2).fieldsGrouping("word-split", new Fields("word")); // 1个task汇总,每隔3秒统计最近5秒的tuple,SlidingWindow滑动窗口(间隔)
// builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count");
// 1个task汇总,统计5秒内的tuple,不能超过15秒?提示超时错误,TumblingWindow滚动窗口
builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withTumblingWindow(new Duration(5, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count"); // 1个task输出
builder.setBolt("word-final", new WordFinalBolt(), 1).shuffleGrouping("sliding-window-bolt"); Config conf = new Config(); conf.setDebug(false); if (args != null && args.length > 0) { // 在集群运行,需要mvn package编译
// bin/storm jar "/root/storm-example-0.0.1.jar" com.scps.storm.helloword.WordTopology "http://nimbus:8080/uploads/shengjing.txt" wordcount try { String file = args[0];
String name = args[1]; conf.put("file", file);
// conf.setNumWorkers(2); StormSubmitter.submitTopology(name, conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace();
} } else { // 直接在eclipse中运行 conf.put("file", "C:\\Users\\Administrator\\Downloads\\shengjing1.txt");
// conf.put("file", "http://192.168.100.170:8080/uploads/shengjing.txt");
// conf.setMaxTaskParallelism(2); // 设置最大task数
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordcount", conf, builder.createTopology());
}
}
}
● WordReaderSpout.java文件,读取txt文件,发送行
package com.scps.storm.helloword.spout; import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils; public class WordReaderSpout implements IRichSpout { private static final long serialVersionUID = 1L;
private SpoutOutputCollector outputCollector;
private String filePath;
private boolean completed = false; public void ack(Object arg0) { } public void activate() { } public void close() { } public void deactivate() { } public void fail(Object arg0) { } @SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { filePath = conf.get("file").toString();
outputCollector = collector;
} public void nextTuple() { if (!completed) { String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
System.out.println("WordReaderSpout nextTuple, " + time); String line = "";
InputStream inputStream = null;
InputStreamReader inputStreamReader = null;
BufferedReader reader = null; try { // filePath = "http://192.168.100.170:8080/uploads/shengjing.txt";
// filePath = "C:\\Users\\Administrator\\Downloads\\shengjing.txt"; if (filePath.startsWith("http://")) { // 远程文件
URL url = new URL(filePath);
URLConnection urlConn = url.openConnection();
inputStream = urlConn.getInputStream();
} else { // 本地文件
inputStream = new FileInputStream(filePath);
} inputStreamReader = new InputStreamReader(inputStream, "utf-8");
reader = new BufferedReader(inputStreamReader);
while ((line = reader.readLine()) != null) {
outputCollector.emit(new Values(line));
} } catch (MalformedURLException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
completed = true;
try {
if (reader != null) {
reader.close();
}
if (inputStreamReader != null) {
inputStreamReader.close();
}
if (inputStream != null) {
inputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
} Utils.sleep(20000);
} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line"));
} public Map<String, Object> getComponentConfiguration() { return null;
}
}
使用集群测试时,先把txt文件上传到nimbus的ui里,随机指派supervisor远程读取文件。
● WordSplitBolt.java文件,接收行,分割行,发送词
package com.scps.storm.helloword.bolt; import java.util.Map; import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values; public class WordSplitBolt implements IRichBolt { private static final long serialVersionUID = 1L;
private OutputCollector outputCollector; @SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { outputCollector = collector;
} public void execute(Tuple input) { String line = input.getStringByField("line"); line = line.trim();
line = line.replace(",", " ");
line = line.replace(".", " ");
line = line.replace(":", " ");
line = line.replace(";", " ");
line = line.replace("?", " ");
line = line.replace("!", " ");
line = line.replace("(", " ");
line = line.replace(")", " ");
line = line.replace("[", " ");
line = line.replace("]", " ");
line = line.trim(); String[] words = line.split(" ");
for (String word : words) {
word = word.trim();
if (!"".equals(word)) {
outputCollector.emit(new Values(word));
}
}
} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word"));
} public void cleanup() { } public Map<String, Object> getComponentConfiguration() { return null;
}
}
● WordCountBolt.java文件,接收词,统计词,发送集合
package com.scps.storm.helloword.bolt; import java.util.HashMap;
import java.util.Map; import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values; public class WordCountBolt implements IRichBolt { private static final long serialVersionUID = 1L;
Map<String, Integer> counter;
private OutputCollector outputCollector; @SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { counter = new HashMap<String, Integer>();
outputCollector = collector;
} public void execute(Tuple input) { String word = input.getStringByField("word");
int count; if (!counter.containsKey(word)) {
count = 1;
} else {
count = counter.get(word) + 1;
} counter.put(word, count);
outputCollector.emit(new Values(word, count));
} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count"));
} public void cleanup() { } public Map<String, Object> getComponentConfiguration() { return null;
}
}
● SlidingWindowBolt.java文件,接收集合,合并集合,发送集合
package com.scps.storm.helloword.bolt; import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map; import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow; public class SlidingWindowBolt extends BaseWindowedBolt { private static final long serialVersionUID = 1L;
Map<String, Integer> counter;
private OutputCollector outputCollector; @SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { counter = new HashMap<String, Integer>();
outputCollector = collector;
} public void execute(TupleWindow inputWindow) { String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
System.out.println("SlidingWindowBolt execute, " + time); for (Tuple input : inputWindow.get()) { String word = input.getStringByField("word");
int count = input.getIntegerByField("count"); counter.put(word, count);
} outputCollector.emit(new Values(counter));
} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("counter"));
}
}
● WordFinalBolt.java文件,接收集合,打印集合
package com.scps.storm.helloword.bolt; import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map; import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple; public class WordFinalBolt implements IRichBolt { private static final long serialVersionUID = 1L; @SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { } @SuppressWarnings("unchecked")
public void execute(Tuple input) { Map<String, Integer> counter = (Map<String, Integer>) input.getValueByField("counter");
List<String> keys = new ArrayList<String>();
keys.addAll(counter.keySet());
Collections.sort(keys);
String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
System.out.println("-----------------begin------------------, " + time);
for (String key : keys) {
System.out.println(key + " : " + counter.get(key));
}
System.out.println("-----------------end--------------------, " + time);
} public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { } public Map<String, Object> getComponentConfiguration() { return null;
}
}
● 项目源码文件地址:https://pan.baidu.com/s/1mhZtvq4 密码:ypbc
Storm之路-WordCount-实例的更多相关文章
-
Hadoop3 在eclipse中访问hadoop并运行WordCount实例
前言: 毕业两年了,之前的工作一直没有接触过大数据的东西,对hadoop等比较陌生,所以最近开始学习了.对于我这样第一次学的人,过程还是充满了很多疑惑和不解的,不过我采取的策略是还是先让环 ...
-
Storm系列(二):使用Csharp创建你的第一个Storm拓扑(wordcount)
WordCount在大数据领域就像学习一门语言时的hello world,得益于Storm的开源以及Storm.Net.Adapter,现在我们也可以像Java或Python一样,使用Csharp创建 ...
-
hadoop运行wordcount实例,hdfs简单操作
1.查看hadoop版本 [hadoop@ltt1 sbin]$ hadoop version Hadoop -cdh5.12.0 Subversion http://github.com/cloud ...
-
hadoop2.6.5运行wordcount实例
运行wordcount实例 在/tmp目录下生成两个文本文件,上面随便写两个单词. cd /tmp/ mkdir file cd file/ echo "Hello world" ...
-
执行hadoop自带的WordCount实例
hadoop 自带的WordCount实例可以统计一批文本文件中各单词出现的次数.下面介绍如何执行WordCount实例. 1.启动hadoop [root@hadoop ~]# start-all. ...
-
Python实现MapReduce,wordcount实例,MapReduce实现两表的Join
Python实现MapReduce 下面使用mapreduce模式实现了一个简单的统计日志中单词出现次数的程序: from functools import reduce from multiproc ...
-
wordcount实例
scala的wordcount实例 package com.wondersgroup.myscala import scala.actors.{Actor, Future} import scala. ...
-
Spark源码编译并在YARN上运行WordCount实例
在学习一门新语言时,想必我们都是"Hello World"程序开始,类似地,分布式计算框架的一个典型实例就是WordCount程序,接触过Hadoop的人肯定都知道用MapRedu ...
-
Storm手写WordCount
建立一个maven项目,在pom.xml中进行如下配置: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:x ...
随机推荐
-
总结eclipse中安装maven插件
当自己越来越多的接触到开源项目时,发现大多数的开源项目都是用maven来够建的.并且在开发应用时,也越来越意识到maven的确会解决很多问题,如果你要了解maven,可以参考:Maven入门指南(一) ...
-
Citect:How do I translate Citect error messages?
http://www.opcsupport.com/link/portal/4164/4590/ArticleFolder/51/Citect To decode the error messag ...
-
[TypeScript] Using Lodash in TypeScript with Typings and SystemJS
One of the most confusing parts of getting started with TypeScript is figuring out how to use all th ...
-
HDU4549 M斐波那契数
M斐波那契数列 题目分析: M斐波那契数列F[n]是一种整数数列,它的定义例如以下: F[0] = a F[1] = b F[n] = F[n-1] * F[n-2] ( n > 1 ) 如今给 ...
-
Layer组件多个iframe弹出层打开与关闭及参数传递
一.Layer简介 Layer是一款近年来备受青睐的web弹层组件,基于jquery,易用.实用,兼容包括IE6在内的所有主流浏览器,拥有丰富强大的可自定义的功能. Layer官网地址:http:// ...
-
【转】javascript笔记之apply、call、bind用法
原文地址:https://www.cnblogs.com/coco1s/p/4833199.html apply.call 在 javascript 中,call 和 apply 都是为了改变某个函数 ...
-
redis的配置文件解释
redis的守护进行 守护进程(Daemon Process),也就是通常说的 Daemon 进程(精灵进程),是 Linux 中的后台服务进程.它是一个生存期较长的进程,通常独立 于控制终端并且周期 ...
-
对象何时进入老年代、何时发生full gc
一.对象何时进入老年代 (1)当对象首次创建时, 会放在新生代的eden区, 若没有GC的介入,会一直在eden区, GC后,是可能进入survivor区或者年老代 (2)当对象年龄达到一定的大小 , ...
-
UA池和代理池
scrapy下载中间件 UA池 代理池 一.下载中间件 先祭出框架图: 下载中间件(Downloader Middlewares) 位于scrapy引擎和下载器之间的一层组件. - 作用: (1)引擎 ...
-
第三课 操作系统开发之x86模拟环境搭建
前面我们讲解了主引导程序的加载过程,并且制作了虚拟软盘a.img,最终这个主引导程序也在机器中成功运行了,但是实际开发的时候,并不会如此简单,免不了调试过程,如果还像上一节中直接将软盘放到机器中去加载 ...