文件名称:第一个Storm应用
文件大小:10KB
文件格式:7Z
更新时间:2019-05-27 10:37:03
Storm
写第一个Storm应用--数单词数量(一个spout读取文本,第一个bolt用来标准化单词,第二个bolt为单词计数) 一、Storm运行模式: 1.本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解) 运行在本地机器的单一JVM上,这个模式主要用来开发、调试。 2.远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的, 因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式。 二、写一个HelloWord Storm 我们现在创建这么一个应用,统计文本文件中的单词个数,详细学习过Hadoop的朋友都应该写过。 那么我们需要具体创建这样一个Topology,用一个spout负责读取文本文件,用第一个bolt来解析成单词,用第二个bolt来对解析出的单词计数, 整体结构流程所示:Word Storage (words.txt) --> Spout(WordReader.java) --> Bolt(WordNormalizer.java) --> Bolt(WordCounter.java) 可以从这里下载源码:https://github.com/storm-book/examples-ch02-getting_started/zipball/master 三、写一个可运行的Demo很简单,我们只需要三步: 1.创建一个Spout读取数据(数据源) Spout作为数据源,它实现了IRichSpout接口,功能是读取一个文本文件并把它的每一行内容发送给bolt。 2.创建bolt处理数据 创建两个bolt来处理Spout发射出的数据 Spout已经成功读取文件并把每一行作为一个tuple(在Storm数据以tuple的形式传递)发射过来,我们这里需要创建两个bolt分别来负责解析每一行和对单词计数。 Bolt中最重要的是execute方法,每当一个tuple传过来时它便会被调用。 3.创建一个Topology提交到集群 4.运行结果分析 如果你的words.txt文件有如下内容: Storm test are great is an Storm simple application but very powerful really Storm is great 你应该会在日志中看到类似下面的内容: is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 Storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1 在这个例子中,每类节点只有一个实例。但是如果你有一个非常大的日志文件呢?你能够很轻松的改变系统中的节点数量实现并行工作。这个时候,你就要创建两个WordCounter实例。 builder.setBolt("word-counter", new WordCounter(),2).shuffleGrouping("word-normalizer"); 程序返回时,你将看到: — 单词数 【word-counter-2】 — application: 1 is: 1 great: 1 are: 1 powerful: 1 Storm: 3 — 单词数 [word-counter-3] — really: 1 is: 1 but: 1 great: 1 test: 1 simple: 1 an: 1 very: 1 棒极了!修改并行度实在是太容易了(当然对于实际情况来说,每个实例都会运行在单独的机器上)。 不过似乎有一个问题:单词is和great分别在每个WordCounter各计数一次。怎么会这样? 当你调用shuffleGrouping时,就决定了Storm会以随机分配的方式向你的bolt实例发送消息。 在这个例子中,理想的做法是相同的单词问题发送给同一个WordCounter实例。 你把shuffleGrouping(“word-normalizer”)换成fieldsGrouping(“word-normalizer”, new Fields(“word”))就能达到目的。 试一试,重新运行程序,确认结果。 你将在后续章节学习更多分组方式和消息流类型。 参考文章 http://blog.csdn.net/suifeng3051/article/details/38369689 http://ifeve.com/getting-started-with-storm-2/
【文件预览】:
Getting-Started
----.project(567B)
----src()
--------test()
--------main()
----target()
--------classes()
--------test-classes()
----.settings()
--------org.eclipse.m2e.core.prefs(90B)
--------org.eclipse.jdt.core.prefs(430B)
--------org.eclipse.core.resources.prefs(119B)
----pom.xml(1KB)
----NOTE.txt(4KB)
----.classpath(1KB)