storm入门--storm的第一个示例

时间:2022-06-21 06:24:43

本文介绍在eclipse下开发storm的第一个示例。本文的示例基于storm提供的example中的ExclamationTopology,位于(examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java)。本文使用的storm版本为storm-1.0.2。本文将在eclipse在进行代码的编写和调试,并最终将代码打成jar包,提交到Linux环境下的storm集群。

拓扑简介
源码ExclamationTopology示例包括一个spout和两个bolt。

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("word", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

spout位于包org.apache.storm.testing.TestWordSpout中,由storm提供,不需要读者编写。其主要作用是不间断的发送一个单词。源码中其发送单词的代码为:

//每隔100ms会从words数组中随机的发送一个单词
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}

bolt的定义位于ExclamationTopology.java中,两个bolt的作用相同,都是从数据流中取出tuple,然后再其后加上”!!!”,并再次发送。
spout和bolt都使用了可靠的消息传递机制,保证了消息被“完全处理”。

代码编写
新建java project名为 ExclamationTopology,并创建class,package名为org.apache.storm.starter,将ExclamationTopology.java的内容拷贝到新建的class中。
在project中的src/的同级新建lib/文件夹,将storm安装包根目录下的lib/文件夹下的所有jar包拷贝到新建工程的lib/目录下,并在eclipse中选中所有的jar包右键选择build path-> add to build path

本文修改ExclamationBolt(),新建 StarBolt(),两者的差别仅仅在于后者将在取到的元组后追加”*“,而非”!!!”
修改后的代码如下:

//ExclamationTopology.java
package org.apache.storm.starter;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;

// This is a basic example of a Storm topology.
public class ExclamationTopology {

public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;

@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}

@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

}

public static class StarBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}

@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "***"));
_collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordstar"));
}

}


public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("word", new TestWordSpout(), 4); //该spout的ID为"word",spout具有4个执行器,并默认分配4个任务
builder.setBolt("exclaim", new ExclamationBolt(), 2) //该bolt的ID为"exclaim",从ID为"word"的spout中接收数据流,bolt具有2个执行器
.setNumTasks(4) //该bolt分配4个任务,平均每个执行器2个任务
.shuffleGrouping("word"); //使用随机分组的方式
builder.setBolt("star", new StarBolt(), 2) //该bolt的ID为"star",从ID为"exclaim"的bolt中接收数据流,bolt具有2个执行器,默认分配2个任务,平均每个执行器1个任务
.shuffleGrouping("exclaim");

Config conf = new Config();
conf.setDebug(true);

if (args != null && args.length > 0) {
//生产环境
conf.setNumWorkers(2); //设置工作进程数为2个
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
//本地开发环境
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}

}
}

这里着重区分几个概念:
工作进程:一个工作进程执行拓扑的一个子集;
执行器(线程):执行器是由工作进程产生的一个线程,用于执行实际的任务;
任务:在拓扑中真正负责数据处理。

在生成环境中启用了2个工作进程,spout以及bolt共有8(4+2+2)个执行器(工作线程),10(4+4+2)个任务,那么每个工作进程拥有8/2个执行器,每个工作进程分配10/2个任务,其中ExclamationBolt的每个执行器会执行两个任务,其余每个执行器执行一项任务。

拓扑提交
将ExclamationTopology.java的代码打成jar包(此处命名为exclamation.jar)后,将jar包上传Linux服务器,使用下述命令提交拓扑

storm jar /home/test/exclamation.jar org.apache.storm.starter.ExclamationTopology  ExclamationTopology
//storm提交拓扑命令的说明
storm jar {jar-path} {jar-class} arg1 arg2
jar-path:拓扑打成的jar包所在的路径
jar-class:拓扑的主类
arg1 arg2 ……:main函数中String[] args中可以获取到的参数

成功提交拓扑执行可以在storm ui(http://{IP}:8080/),查看”Topology Summary”(8080端口在storm.yaml中配置)。

Kill拓扑
1.如果storm UI正常工作的话,可以在拓扑的详情界面中 Topology actions -> kill
2.命令: storm kill {Topology-name} -w 3 //-w 3是指等待3s后kill

工作日志
worker的工作日志位于 /home/storm/logs/workers-artifacts/ExclamationTopology-6-1491795978路径下以WORKER的通信端口(本文有两个worker)命名的文件夹下

ExclamationTopology-6-1491795978
├── 6702
│ ├── gc.log.0
│ ├── worker.log
│ ├── worker.log.err
│ ├── worker.log.metrics
│ ├── worker.log.out
│ ├── worker.pid
│ └── worker.yaml
└── 6703
├── gc.log.0
├── worker.log
├── worker.log.err
├── worker.log.metrics
├── worker.log.out
├── worker.pid
└── worker.yaml

由于开启了debug模式(conf.setDebug(true);)可以在worker日志中观察到spout和bolt的工作情况.