通过前面6个章节,我们大致了解apache storm的核心细节了,现在我们开始写一些简单的代码,来感受下storm的魅力。
场景——移动呼叫日志分析
移动电话呼叫号及其持续时间将作为Apache stormd的输入流,storm将根据拨号方和接收方之间的电话号码以及通话次数进行分组。
Spout Creation
Spout是storm用于数据生成的一个组件,。
通常,Spout会实现一个IRichSpout接口。
IRichSpout接口有以下重要的方法:
1.open:−提供Spout以及spout的执行环境。executors会运行这个方法来初始化spout。
2.nextTuple:通过收集器发送产生的数据。
3.close−关闭Spout时调用close方法。
4.declareOutputFields:声明输出元组的schema。
5.ack:处理特定的元组
6.fail:指定一个特定的不用处理和再加工元组。
open
open方法签名如下:
open(map conf,TopologyContext context,SpoutOutputCollectorcollector)
参数解析:
conf:为Spout提供storm配置。
context:在topology中提供Spout的完整信息,包括:任务id,输入输出信息。
collector:保证我们发送的数据能被bolt处理。
nextTuple
nextTuple方法签名如下:
nextTuple()
nextTuple()定期方法定期的被相同循环中的ack()方法和fail()方法调用。
当没有工作要做的时候必须释放线程,以保证其他方法有机会被调用。
因此,nextTuple首先要检查处理是否已经完成。
如果完成,在结果返回之前,为了降低处理器的负载,该线程应该至少睡眠1毫秒。
close
Close方法签名如下:
close()
declareOutputFields
declareOutputFields方法前面如下:
declareOutputFields(OutputFieldsDeclarer declarer)
参数说明:
Declarer:声明输出流的ids,输出字段,等等。
这个方法用于指定tuple输出的shema。
ack
ack方法的签名如下:
ack(Object msgId)
这个方法表明指定的tuple已经被处理过。
fail
fail方法签名如下:
fail(Object msgId)
表明spout发送出的数据并没有被完全处理,storm会重新处理这个数据。
FakeCallLogReaderSpout
现在我们要收集手机日志的详细信息,包含:
1.主叫号码
2.被叫号码
3.通话时长
因为我们没有实时的通话记录信息,那么我们就自己模拟通话记录。
Random随机类产生模拟的通话信息。
完整的程序代码如下所示。
Coding − FakeCallLogReaderSpout.java
import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
//Create a class FakeLogReaderSpout which implement IRichSpout interface
to access functionalities
public class FakeCallLogReaderSpout implements IRichSpout {
//Create instance for SpoutOutputCollector which passes tuples to bolt.
private SpoutOutputCollector collector;
private boolean completed = false;
//Create instance for TopologyContext which contains topology data.
private TopologyContext context;
//Create instance for Random class.
private Random randomGenerator = new Random();
private Integer idx = 0;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple() {
if(this.idx <= 1000) {
List<String> mobileNumbers = new ArrayList<String>();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");
Integer localIdx = 0;
while(localIdx++ < 100 && this.idx++ < 1000) {
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
while(fromMobileNumber == toMobileNumber) {
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}
Integer duration = randomGenerator.nextInt(60);
this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("from", "to", "duration"));
}
//Override all the interface methods
@Override
public void close() {}
public boolean isDistributed() {
return false;
}
@Override
public void activate() {}
@Override
public void deactivate() {}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Bolt Creation
Bolt是一个以元组作为输入,处理元组后产生新的元组作为输出的组件。
Bolt通常需要实现IRichBolt接口。
在这个程序中,两个bolt类CallLogCreatorBolt和CallLogCounterBolt被用来执行的操作处理。
IRichBolt接口有如下方法:
1.prepare :准备−提供bolt以及bolt的执行环境。
executors会执行这个方法去初始化bolt。
2.execute:处理输入的单个tuple。
3.cleanup :要关闭bolt时被调用。
4.declareOutputFields:声明输出元组的schema。
Prepare
prepare方法的签名如下:
prepare(Map conf, TopologyContext context, OutputCollector collector)
参数说明:
Conf:为bolt提供配置
Context:在topology中提供完整的bolt位置信息,包括它的任务id,输入和输出信息等。
Collector:保证处理过的tuple能被发送出去。
Execute
execute方法签名如下:
execute(Tuple tuple)//这里的tuple是将被处理的输入tuple
execute方法每次处理一个元组。
通过tuple的getValue方法访问元组的数据。
输入元组不是必须立即处理的,可以过一会再处理。
可以处理多个元组,处理后产生单个tuple作为输出tuple。
处理过的tuple可以使用OutputCollector类发送出去。
Cleanup
cleanup方法签名如下:
Cleanup()
declareOutputFields
declareOutputFields方法签名如下:
declareOutputFields(OutputFieldsDeclarer declarer)//这里的declarer用来声明输出流的ids,输出字段等信息
这个方法用来指定tuple的输出shema。
Call log Creator Bolt
Call log creator bolt 接收通话日志tuple。
通话日志tuple包含主叫号码,被叫号码和通话时长。
This bolt simply creates a new value by combining the caller number and the receiver number.
这个螺栓简单地创建了一个新值通过调用者数量和接收方号码。
格式化后的新值字段叫call,格式是”Caller number – Receiver number”
完整的代码如下所示:
//import util packages
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
//Create instance for OutputCollector which collects and emits tuples to produce output
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(new Values(from + " - " + to, duration));
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call", "duration"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Call log Counter Bolt
Call log counter bolt 接收上一个bolt call及其持续时间作为一个tuple。
在bolt的prepare方法中初始化一个字典(Map)对象。
在execute 方法中,它检查字典中的tuple并为tuple中的每一个新的“call”值创建一个条目entry,并设置字典的值为1。
对于字典中现有的条目,则将其值+1。
简而言之,这个bolt在字典中保存call和它的数量
如果不保存在字典中,我们也可以把它保存懂啊一个数据源中。
而不是保存调用及其计数在字典里,我们也可以将它保存到一个数据源。
完整的程序代码如下:
Coding − CallLogCounterBolt.java
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class CallLogCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if(!counterMap.containsKey(call)){
counterMap.put(call, 1);
}else{
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Creating Topology
通常storm的topology是一个Thrift结构。
TopologyBuilder类提供了简单易用的方法来创建复杂的topology。
TopologyBuilder类提供了方法来设spout(setSpout)和bolt(setBolt)。
总之,TopologyBuilder用 createTopology创建topology。
下面的代码片段为创建topology的事例:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()).fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping和fieldsGrouping方法帮助spout和bolt对stream进行分组。
Local Cluster
为了便于开发,我们可以使用“LocalCluster”对象创建一个本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交topology。
其中,“submitTopology”的参数之一是“Config”类的一个实例。
“Config”类的作用是提交topology之前设置配置选项。
This configuration option will be merged with the cluster configuration at run time and sent to all task (spout and bolt) with the prepare method.
这种配置选项将合并在运行时间和发送到所有集群配置任务(壶嘴和螺栓)的准备方法。
一旦topology被提交到集群,我们需要等待10秒以便集群计算提交的topology,然后使用”shutdown”方法关闭集群。
完整的程序代码如下:
Coding − LogAnalyserStorm.java
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
public static void main(String[] args) throws Exception{
//Create Config instance for cluster configuration
Config config = new Config();
config.setDebug(true);
//
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);
//Stop the topology
cluster.shutdown();
}
}
Building and Running the Application
完整的应用程序有四个Java代码:
1.FakeCallLogReaderSpout.java
2.CallLogCreaterBolt.java
3.CallLogCounterBolt.java
4.LogAnalyerStorm.java
应用程序可以使用下面的命令构建:
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
应用程序可以使用以下的命令运行:
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
Output
一旦应用程序启动,它将输出完整的集群启动进程的细节,和pout和bolt处理过程,最后,集群关闭这些处理过程。
在 "CallLogCounterBolt"代码中,我们打印了call和count的具体信息。
这些信息将显示在控制台,如下:
1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93
JVM外的其他语言
Storm topology通过Thrift接口实现,这使得很容易任何语言去提交topology到storm集群中。
Storm支持Ruby、Python和许多其他语言。
让我们看看使用python事例:
Python Binding
Python是一种解释型,交互的、面向对象的高级编程语言。
Storm支持用Python实现其topology。
Python支持 emitting, anchoring, acking, and logging operations。
如你所知,bolt可以使用任何语言定义。
Bolts written in another language are executed as sub-processes, and Storm communicates with those sub-processes with JSON messages over stdin/stdout.
下面来看一个用python编写的bolt来计算单词出现次数的事例:
public static class WordCount implements IRichBolt {
public WordSplit() {
super("python", "splitword.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
Here the class WordCount implements the IRichBolt interface and running with python implementation specified super method argument "splitword.py".
现在创建一个名为“splitword.py”的python实现。
import storm
class WordCountBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
WordCountBolt().run()
这是Python实现计数的示例。
同样你也可以用其他支持的语言实现。