storm七之storm java示例

时间:2021-04-12 06:24:44

通过前面6个章节,我们大致了解apache storm的核心细节了,现在我们开始写一些简单的代码,来感受下storm的魅力。

场景——移动呼叫日志分析

移动电话呼叫号及其持续时间将作为Apache stormd输入,storm根据拨号方和接收方之间的电话号码以及通话次数进行分组

 

 

 

Spout Creation

Spoutstorm用于数据生成一个组件,。

通常,Spout实现一个IRichSpout接口。

IRichSpout接口有以下重要的方法

1.open−提供Spout以及spout执行环境。executors运行这个方法来初始化spout

2.nextTuple通过收集器发送产生的数据

3.close关闭Spout时调用close方法

4.declareOutputFields声明输出元组schema

5.ack处理特定的元组

6.fail指定一个特定的不处理和再加工元组。

 

open

open方法签名如下:

open(map conf,TopologyContext context,SpoutOutputCollectorcollector)

参数解析:

confSpout提供storm配置。

context:topology中提供Spout的完整信息,包括:任务id,输入输出信息

collector:保证我们发送的数据能被bolt处理

 

nextTuple

nextTuple方法签名如下:

nextTuple()

nextTuple()定期方法定期的被相同循环中的ack()方法和fail()方法调用

当没有工作要做的时候必须释放线程,以保证其他方法有机会被调用。

因此,nextTuple首先要检查处理是否已经完成。

如果完成,在结果返回之前,为了降低处理器的负载,该线程应该至少睡1毫秒。

 

 

close

Close方法签名如下:

close()

declareOutputFields

declareOutputFields方法前面如下:

declareOutputFields(OutputFieldsDeclarer declarer)

参数说明:

Declarer声明输出流ids,输出字段,等等。

这个方法用于指定tuple输出的shema

 

ack

ack方法的签名如下:

ack(Object msgId)

这个方法表明指定的tuple已经被处理过。

 

fail

fail方法签名如下:

fail(Object msgId)

表明spout发送出的数据并没有被完全处理,storm会重新处理这个数据。

FakeCallLogReaderSpout

现在我们要收集手机日志的详细信息,包含:

1.主叫号码

2.被叫号码

3.通话时长

因为我们没有实时通话记录信息,那么我们就自己模拟通话记录。

Random随机类产生模拟的通话信息。

完整的程序代码如下所示。

Coding FakeCallLogReaderSpout.java

 

 

import java.util.*;

//import storm tuple packages

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

//import Spout interface packages

import backtype.storm.topology.IRichSpout;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

 

//Create a class FakeLogReaderSpout which implement IRichSpout interface

   to access functionalities

public class FakeCallLogReaderSpout implements IRichSpout {

   //Create instance for SpoutOutputCollector which passes tuples to bolt.

   private SpoutOutputCollector collector;

   private boolean completed = false;

   //Create instance for TopologyContext which contains topology data.

   private TopologyContext context;

   //Create instance for Random class.

   private Random randomGenerator = new Random();

   private Integer idx = 0;

 

   @Override

   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

      this.context = context;

      this.collector = collector;

   }

 

   @Override

   public void nextTuple() {

      if(this.idx <= 1000) {

         List<String> mobileNumbers = new ArrayList<String>();

         mobileNumbers.add("1234123401");

         mobileNumbers.add("1234123402");

         mobileNumbers.add("1234123403");

         mobileNumbers.add("1234123404");

 

         Integer localIdx = 0;

         while(localIdx++ < 100 && this.idx++ < 1000) {

            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

            while(fromMobileNumber == toMobileNumber) {

               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

            }

            Integer duration = randomGenerator.nextInt(60);

            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));

         }

      }

   }

 

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("from", "to", "duration"));

   }

 

   //Override all the interface methods

   @Override

   public void close() {}

 

   public boolean isDistributed() {

      return false;

   }

 

   @Override

   public void activate() {}

 

   @Override

   public void deactivate() {}

 

   @Override

   public void ack(Object msgId) {}

 

   @Override

   public void fail(Object msgId) {}

 

   @Override

   public Map<String, Object> getComponentConfiguration() {

      return null;

   }

}

 

 

Bolt Creation

Bolt是一个以元组作为输入处理元组产生新的元组作为输出的组件

Bolt通常需要实现IRichBolt接口。

在这个程序中,两个boltCallLogCreatorBolt和CallLogCounterBolt用来执行的操作处理

 

IRichBolt接口有如下方法:

1.prepare 准备−提供bolt以及bolt执行环境。

executors会执行这个方法去初始化bolt

2.execute处理输入的单个tuple

3.cleanup 要关闭bolt时被调用

4.declareOutputFields声明输出元组schema

Prepare

prepare方法的签名如下:

prepare(Map conf, TopologyContext context, OutputCollector collector)

参数说明:

Conf:为bolt提供配置

Context:topology提供完整bolt位置信息,包括它的任务id,输入和输出信息等。

Collector:保证处理过的tuple能被发送出去。

Execute

execute方法签名如下:

execute(Tuple tuple)//这里的tuple是将被处理的输入tuple

execute方法每次处理一个元组。

通过tuple的getValue方法访问元组的数据。

输入元组不是必须立即处理的,可以过一会再处理

可以处理多个元组处理后产生单个tuple作为输出tuple

处理过的tuple可以使用OutputCollector类发送出去

 

Cleanup

cleanup方法签名如下:

Cleanup()

 

declareOutputFields

declareOutputFields方法签名如下:

declareOutputFields(OutputFieldsDeclarer declarer)//这里的declarer用来声明输出流的ids,输出字段等信息

这个方法用来指定tuple的输出shema

Call log Creator Bolt

Call log creator bolt 接收通话日志tuple

通话日志tuple包含主叫号码,被叫号码和通话时长

 

This bolt simply creates a new value by combining the caller number and the receiver number.

这个螺栓简单地创建了一个新值通过调用者数量和接收方号码。

格式化后的新值字段叫call,格式是Caller number – Receiver number

完整的代码如下所示:

//import util packages

import java.util.HashMap;

import java.util.Map;

 

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

 

//import Storm IRichBolt package

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

 

//Create a class CallLogCreatorBolt which implement IRichBolt interface

public class CallLogCreatorBolt implements IRichBolt {

   //Create instance for OutputCollector which collects and emits tuples to produce output

   private OutputCollector collector;

 

   @Override

   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

      this.collector = collector;

   }

 

   @Override

   public void execute(Tuple tuple) {

      String from = tuple.getString(0);

      String to = tuple.getString(1);

      Integer duration = tuple.getInteger(2);

      collector.emit(new Values(from + " - " + to, duration));

   }

 

   @Override

   public void cleanup() {}

 

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("call", "duration"));

   }

   @Override

   public Map<String, Object> getComponentConfiguration() {

      return null;

   }

}

 

Call log Counter Bolt

Call log counter bolt 接收上一个bolt call及其持续时间作为一个tuple

boltprepare方法中初始化一个字典(Map)对象。

execute 方法,它检查字典中的tuple并为tuple中的每一个新的call”值创建一个条目entry,并设置字典的值为1

对于字典中现有的条目,则将其值+1

简而言之,这个bolt在字典中保存call和它的数量

如果不保存在字典中,我们也可以把它保存懂啊一个数据源中。

而不是保存调用及其计数在字典里,我们也可以将它保存到一个数据源。

完整的程序代码如下

Coding − CallLogCounterBolt.java

import java.util.HashMap;

import java.util.Map;

 

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

 

public class CallLogCounterBolt implements IRichBolt {

   Map<String, Integer> counterMap;

   private OutputCollector collector;

 

   @Override

   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

      this.counterMap = new HashMap<String, Integer>();

      this.collector = collector;

   }

 

   @Override

   public void execute(Tuple tuple) {

      String call = tuple.getString(0);

      Integer duration = tuple.getInteger(1);

      if(!counterMap.containsKey(call)){

         counterMap.put(call, 1);

      }else{

         Integer c = counterMap.get(call) + 1;

         counterMap.put(call, c);

      }

      collector.ack(tuple);

   }

 

   @Override

   public void cleanup() {

      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){

         System.out.println(entry.getKey()+" : " + entry.getValue());

      }

   }

 

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("call"));

   }

   @Override

   public Map<String, Object> getComponentConfiguration() {

      return null;

   }

}

 

 

Creating Topology

通常stormtopology是一个Thrift结构。

TopologyBuilder类提供了简单易用的方法来创建复杂的topology

TopologyBuilder类提供了方法来设spout(setSpout)和bolt(setBolt)。

总之,TopologyBuilder createTopology创建topology

下面的代码片段为创建topology的事例:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()).fieldsGrouping("call-log-creator-bolt", new Fields("call"));

 

shuffleGrouping和fieldsGrouping方法帮助spoutboltstream进行分组。

 

Local Cluster

为了便于开发,我们可以使用“LocalCluster”对象创建一个本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交topology

其中,“submitTopology”的参数之一是Config”类的一个实例。

Config”类的作用是提交topology之前设置配置选项。

 

This configuration option will be merged with the cluster configuration at run time and sent to all task (spout and bolt) with the prepare method.

这种配置选项将合并在运行时间和发送到所有集群配置任务(壶嘴和螺栓)的准备方法。

一旦topology提交到集群,我们需要等待10秒以便集群计算提交topology,然后使用shutdown方法关闭集群。

完整的程序代码如下

Coding − LogAnalyserStorm.java

 

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

//import storm configuration packages

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

 

//Create main class LogAnalyserStorm submit topology.

public class LogAnalyserStorm {

   public static void main(String[] args) throws Exception{

      //Create Config instance for cluster configuration

      Config config = new Config();

      config.setDebug(true);

      //

      TopologyBuilder builder = new TopologyBuilder();

      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

 

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())

         .shuffleGrouping("call-log-reader-spout");

 

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())

         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

      LocalCluster cluster = new LocalCluster();

      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());

      Thread.sleep(10000);

      //Stop the topology

      cluster.shutdown();

   }

}

 

Building and Running the Application

完整的应用程序有四个Java代码

1.FakeCallLogReaderSpout.java
2.CallLogCreaterBolt.java
3.CallLogCounterBolt.java
4.LogAnalyerStorm.java

 

应用程序可以使用下面的命令构建:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

应用程序可以使用以下的命令运行:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Output

一旦应用程序启动,它将输出完整的集群启动进程的细节,poutbolt处理过程,最后,集群关闭这些处理过程

 "CallLogCounterBolt"代码中,我们打印了callcount的具体信息。

这些信息将显示在控制台如下

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

JVM外的其他语言

Storm topology通过Thrift接口实现,这使得很容易任何语言去提交topologystorm集群中

Storm支持Ruby、Python和许多其他语言。

让我们看看使用python事例:

Python Binding

Python是一种解释,交互的、面向对象的高级编程语言。

Storm支持Python实现其topology

Python支持 emitting, anchoring, acking, and logging operations

如你所知,bolt可以使用任何语言定义

 

Bolts written in another language are executed as sub-processes, and Storm communicates with those sub-processes with JSON messages over stdin/stdout.

下面来看一个用python编写的bolt来计算单词出现次数的事例:

 

public static class WordCount implements IRichBolt {
public WordSplit() {
super("python", "splitword.py");
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

 

Here the class WordCount implements the IRichBolt interface and running with python implementation specified super method argument "splitword.py".

现在创建一个名为“splitword.py”的python实现。

import storm
class WordCountBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
WordCountBolt().run()

 

这是Python实现计数的示例。

同样你也可以其他支持语言实现