上代码:
public class TridentWordCount {
public static class Split extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
collector.emit(new Values(word));
}
}
} public static StormTopology buildTopology(LocalDRPC drpc) {
//这个是一个batch Spout 一次发3个..
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
new Values("how many apples can you eat"), new Values("to be or not to be the person"));
spout.setCycle(true);//Spout是否循环发送 TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16)//类似于setSpout
.each(new Fields("sentence"),new Split(), new Fields("word"))//setbolt
.groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),new Count(), new Fields("count")).parallelismHint(16); topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields(
"word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),
new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));
return topology.build();
} public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
for (int i = 0; i < 100; i++) {
System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
Thread.sleep(1000);
}
}
else {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
}
}
}
Storm官方提供的trident单词计数的例子的更多相关文章
-
自定义实现InputFormat、OutputFormat、输出到多个文件目录中去、hadoop1.x api写单词计数的例子、运行时接收命令行参数,代码例子
一:自定义实现InputFormat *数据源来自于内存 *1.InputFormat是用于处理各种数据源的,下面是实现InputFormat,数据源是来自于内存. *1.1 在程序的job.setI ...
-
storm(5)-分布式单词计数例子
例子需求: spout:向后端发送{"sentence":"my dog has fleas"}.一般要连数据源,此处简化写死了. 语句分割bolt(Split ...
-
Strom的trident单词计数代码
/** * 单词计数 */ public class LocalTridentCount { public static class MyBatchSpout implements IBatchSpo ...
-
Storm实现单词计数
package com.mengyao.storm; import java.io.File; import java.io.IOException; import java.util.Collect ...
-
【Storm】storm安装、配置、使用以及Storm单词计数程序的实例分析
前言:阅读笔记 storm和hadoop集群非常像.hadoop执行mr.storm执行topologies. mr和topologies最关键的不同点是:mr执行终于会结束,而topologies永 ...
-
MapReduce之单词计数
最近在看google那篇经典的MapReduce论文,中文版可以参考孟岩推荐的 mapreduce 中文版 中文翻译 论文中提到,MapReduce的编程模型就是: 计算利用一个输入key/value ...
-
2.Storm集群部署及单词统计案例
1.集群部署的基本流程 2.集群部署的基础环境准备 3.Storm集群部署 4.Storm集群的进程及日志熟悉 5.Storm集群的常用操作命令 6.Storm源码下载及目录熟悉 7.Storm 单词 ...
-
大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)
前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分 ...
-
Storm集群部署及单词技术
1. 集群部署的基本流程 集群部署的流程:下载安装包.解压安装包.修改配置文件.分发安装包.启动集群 注意: 所有的集群上都需要配置hosts vi /etc/hosts 192.168.239.1 ...
随机推荐
-
Python 小爬虫流程总结
接触Python3一个月了,在此分享一下知识点,也算是温故而知新了. 接触python之前是做前端的.一直希望接触面能深一点.因工作需求开始学python,几乎做的都是爬虫..第一个demo就是爬取X ...
-
C#验证码的生成
HTML <ul> <li>验证码:</li> <li> <img src="/Login/GetValidateCode?ID=1&q ...
-
jsp文件引入js文件的方式(项目部署于web容器中)
在页面中引入javascript文件的方式是多种多样的,本文介绍两种. 通过<script>标签插入js文件 通过这种方式引入的js,写对js文件和jsp文件的路径很重要.下面给出一个项目 ...
-
Alwayson+Replication
本文将介绍如何实现Alwayson + Replication ,通过AlwaysOn实现Publication database的高可用性,使Publication database在failove ...
-
winform(ListView及数据库连接)
一.ListView:列表展示数据1.视图 - 在其右上方小箭头点击将视图改为Largelcon:或右键属性在外观View将其改为Details2.设置列头 - 在其右上方小箭头点击选择编辑列,然后添 ...
-
javascript中的迭代器
1.forEach迭代器 forEach方法接收一个函数作为参数,对数组中每个元素使用这个函数,只调用这个函数,数组本身没有任何变化 //forEach迭代器 function square(num) ...
-
jquery.lazyload用法
lazyload是jquery的插件,作为延迟加载图片,减压服务器压力. 如何使用: 先把 <script src="jquery.js" type="text/j ...
-
SqlBulkCopy 类
1.SqlBulkCopy 简介 Microsoft SQL Server 提供一个称为 bcp 的流行的命令提示符实用工具,用于将数据从一个表移动到另一个表(表既可以在同一个服务器上,也可以在不同 ...
-
n! 进制
n! 进制 Time limit per test: 1.0 seconds Time limit all tests: 1.0 seconds Memory limit: 256 megabytes ...
-
[转载] Netty教程
转载自http://blog.csdn.net/kobejayandy/article/details/11493717 先啰嗦两句,如果你还不知道Netty是做什么的能做什么.那可以先简单的搜索了解 ...