WordCount是很多分布式计算中,最常用的例子,例如Hadoop、Storm,Iveely Computing也不例外。明白了WordCount在Iveely Computing上的运行原理,就很容易写出新的分布式程序。上一篇中已经知道了如何部署Iveely Computing以及提交任务,现在我们将深入WordCount的代码。
一、代码结构
图3-1
从图3-1中,可以看出,类WordCount中,有两个子类WordInput、WordOutput,以及一个主方法,WordCount.java即是一个Topology,里面至少包涵一个Input和Output(缺一不可,否则没有意义),以及main函数,main函数依然是Topology的入口函数。
现在问题来了,Input和Output到底是什么关系?还有Topology?
每一个Topology就是一个完整的任务链,可以包含多个Input,多个Output,Input的数据只能传递给一个或多个Output,Output只能将数据传递给一个或多个Output,从而形成一个完整的拓扑结构。
二、Input 深入
Input是数据的产生源,通过类WordInput看下是如何产生数据,并传递给Output的。
public static class WordInput extends IInput { /**
* Output data to collector.
*/
private StreamChannel _channel; /**
* All sample words.
*/
private final String[] _words = new String[] { "welcome", "iveely", "computing", "0.9.0", "build", "by",
"liufanping", "thanks", "github.com" }; private int _index; @Override
public void start(HashMap<String, Object> conf, StreamChannel channel) {
// Here,must be initialize channel.
_channel = channel;
_index = _words.length - 1;
} @Override
public void declareOutputFields(FieldsDeclarer declarer) {
declarer.declare(new String[] { "word" }, new Integer[] { 0 });
} @Override
public void nextTuple() {
if (_index < 0) {
_channel.emitEnd();
} else {
for (int i = 0; i < 100; i++) {
_channel.emit(_words[_index]);
}
_index--;
}
} @Override
public void end() {
System.out.println(getName() + " finished.");
} @Override
public void toOutput() {
_channel.addOutputTo(new WordOutput());
}
}
函数讲解:
start函数 | 在执行此Input之前提前调用的函数,用户初始化等相关工作,类似于构造函数,对有数据输出的时候,一定要初始化channel。 |
declareOutputFields函数 | 用于声明输出的数据信息。 |
nextTuple函数 | 此函数将会被频繁调用,用于输出数据,利用channel.emit提交数据到output。 |
end函数 | 是在Input执行完毕之后,会执行的代码,类似于析构函数。 |
toOutput函数 | 是指定Input的数据输出到的Output。 |
上面代码中,必须注意的几个问题:
2.1 WordInput必须继承IInput。
2.2 Input中,必须在start中初始化channel,因为input一定会产生数据。
2.3 Input中,toOutput函数中,必须指定数据流向。
三、Output深入
Output是数据的处理单元,也可以是新数据的产生单元。
public static class WordOutput extends IOutput { private TreeMap<String, Integer> _map; @Override
public void start(HashMap<String, Object> conf, StreamChannel channel) {
_map = new TreeMap<>();
} @Override
public void declareOutputFields(FieldsDeclarer declarer) {
declarer.declare(new String[] { "word", "totalCount" }, null);
} @Override
public void execute(Tuple tuple) {
String word = tuple.get(0).toString();
if (_map.containsKey(word)) {
int currentCount = _map.get(word);
_map.put(word, currentCount + 1);
} else {
_map.put(word, 1);
}
} @Override
public void end() {
// Output map to database or print.
Iterator<String> it = _map.keySet().iterator();
while (it.hasNext()) {
String key = it.next();
int value = _map.get(key);
System.out.println(getName() + ":" + key + "," + value);
}
} @Override
public void toOutput() { }
}
与Input相比,output中没有nextTuple函数,而是取而代之的execute函数。nextTuple是产生数据,execute是处理数据。如果execute处理完毕之后的数据也需要提交到新的output中去,则需要在execute中利用channel.emit方法提交数据,此刻toOutput中也需要指定数据流向。
此处也需要注意几个问题:
3.1 如果output需要继续传递数据,则需要在start中初始化channel。
3.2 如果当前output接受的数据源来自不同的input,且数据格式不统一,则需要自行判断数据格式,例如传递数组中,第一个用int标识是什么样的数据格式。
四、main函数
main函数,依然是Topology的执行入口,不同的是,它有两种执行方式,一个是本地模式,一个是远程执行模式。本地模式是用于调试用。
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder(true, WordCount.class.getName(), "WordCount");
builder.setInput(new WordInput(), 1);
builder.setOutput(new WordOutput(), 4);
builder.setSlave(2);
TopologySubmitter.submit(builder, args);
}
main函数中,主要做的工作。
4.1 新建TopologyBuilder对象,并在构造函数的第一个参数指定当前是本地模式(true)还是远程模式(false),第二个参数,指定执行的类名,第三个参数,当前Topology的名称。
4.2 设定input和output。并指定运行的数量比(线程)。
4.3 指定在多少个节点上运行(进程)。
4.4 利用TopologySubmitter提交任务即可。
4.5 注意:在生成jar提交到服务器上运行时,一定要将TopologyBuilder的第一个参数改为远程模式(false)。
开源分布式实时计算引擎 Iveely Computing 之 WordCount 详解(3)的更多相关文章
-
开源分布式实时计算引擎 Iveely Computing 之 安装部署(2)
在Github中下载代码和二进制程序中,您都会看到一个bin\iveely computing目录,里面即是Iveely Computing的运行库. 以前总是有 ...
-
开源分布式实时计算引擎 Iveely Computing 之 本地调试Topology(4)
当我们写完一个比较复杂的Topology之后,倘若直接提交到服务器上运行,难免会有很多问题,如何进行本地的调试Topology,是我们非常关心的问题.我们依然以WordCount作为代码示例. 首先, ...
-
JStorm 是一个分布式实时计算引擎
alibaba/jstorm JStorm 是一个分布式实时计算引擎. JStorm 是一个类似Hadoop MapReduce的系统, 用户按照指定的接口实现一个任务,然后将这个任务递交给JStor ...
-
基于Kafka的实时计算引擎如何选择?Flink or Spark?
1.前言 目前实时计算的业务场景越来越多,实时计算引擎技术及生态也越来越成熟.以Flink和Spark为首的实时计算引擎,成为实时计算场景的重点考虑对象.那么,今天就来聊一聊基于Kafka的实时计算引 ...
-
基于Kafka的实时计算引擎如何选择?(转载)
1.前言 目前实时计算的业务场景越来越多,实时计算引擎技术及生态也越来越成熟.以Flink和Spark为首的实时计算引擎,成为实时计算场景的重点考虑对象.那么,今天就来聊一聊基于Kafka的实时计算引 ...
-
一文让你彻底了解大数据实时计算引擎 Flink
前言 在上一篇文章 你公司到底需不需要引入实时计算引擎? 中我讲解了日常中常见的实时需求,然后分析了这些需求的实现方式,接着对比了实时计算和离线计算.随着这些年大数据的飞速发展,也出现了不少计算的框架 ...
-
(第8篇)实时可靠的开源分布式实时计算系统——Storm
摘要: 在Hadoop生态圈中,针对大数据进行批量计算时,通常需要一个或者多个MapReduce作业来完成,但这种批量计算方式是满足不了对实时性要求高的场景.那Storm是怎么做到的呢? 博主福利 给 ...
-
Spark Streaming——Spark第一代实时计算引擎
虽然SparkStreaming已经停止更新,Spark的重点也放到了 Structured Streaming ,但由于Spark版本过低或者其他技术选型问题,可能还是会选择SparkStreami ...
-
《大数据实时计算引擎 Flink 实战与性能优化》新专栏
基于 Flink 1.9 讲解的专栏,涉及入门.概念.原理.实战.性能调优.系统案例的讲解. 专栏介绍 扫码下面专栏二维码可以订阅该专栏 首发地址:http://www.54tianzhisheng. ...
随机推荐
-
thinkphp在模型中自动完成session赋值
相信用过thinkphp的用户都知道thinkphp的模型可以完成很多辅助功能,比 如自动验证.自动完成等,今天在开发中遇到自动完成中需要获取session值 然后自动赋值的功能,具体看代码:clas ...
-
centos 防火墙设置
1.安装iptables防火墙 怎么知道系统是否安装了iptables?执行iptables -V,如果显示如: iptables v1.3.5 说明已经安装了iptables. 如果没有安装ipta ...
-
【转】如何使用PhoneGap打包Web App
如何使用PhoneGap打包Web App 最近做了一款小游戏,定位是移动端访问,思来想去最后选择了jQuery mobile最为框架,制作差不多以后,是否可以打包成App,恰好以前对PhoneGap ...
-
【BZOJ 1033】 [ZJOI2008]杀蚂蚁antbuster
Description 最近,佳佳迷上了一款好玩的小游戏:antbuster.游戏规则非常简单:在一张地图上,左上角是蚂蚁窝,右下角是蛋糕,蚂蚁会源源不断地从窝里爬出来,试图把蛋糕搬回蚂蚁窝.而你的任 ...
-
salesforce apex class call exteral webservice
在项目中需要调用外面的Webservice, 从Salesforce往外写入其他系统.目前一般有两种方法. 1. 根据对方提供的wsdl文件生成apex class,直接实例化后调用其方法(测试成功 ...
-
log4j2 实际使用详解
转载至: https://blog.csdn.net/vbirdbest/article/details/71751835 如下是maven项目中的实例: 首先pom.xml中引入如下依赖,注意看都是 ...
-
HDU5769-Substring-多校#4-1006-后缀数组
给定一个字符x和一个字符串.要求输出包含此字符的所有不同字串. 后缀数组可以计算一个字符串的所有不同字串,理解了原理就能做这题了. 对于每一个后缀i,将产生len-sa[i]-hight[i]的前缀, ...
-
C#:MVC打印PDF文件
在百度上找了许多PDF文件打印,但是符合我需求的打印方式还没看到,所以根据看了https://www.cnblogs.com/TiestoRay/p/3380717.html的范例后,研究了一下,做出 ...
-
【BZOJ1053】[HAOI2007]反素数(搜索)
[BZOJ1053][HAOI2007]反素数(搜索) 题面 BZOJ 洛谷 题解 大力猜一下用不了几个质因子,那么随便爆搜一下就好了. #include<iostream> #inclu ...
-
利用JqGrid结合ashx显示列表之一
最近项目决定运用JqGrid列表控件显示相关数据,以前接触比较多还是easyui和Ext.Net的列表控件,文章简单写的小实例进行一个总结: 1:引入相关的JS及CSS文件,JqGrid目前可以利用J ...