storm 实战及实例讲解(二)
前面已近介绍了storm集群的搭建,和使用场景,那么现在让我们一起来探讨一下storm具体该怎么使用吧。
首先,我们要明白如何创建一个topology,topology是storm集群上面运行的基本单元,而一个topology又可以有若干个sport和bolt以某种策略组合而成,关于storm的流分组等概念我们可以,参考这里的一些资料。提醒一下这些概念很重要。我们开发storm应用的第一步就是定义一个topolog,下面我讲直接上代码,如果这些概念搞不懂的话很难弄清楚。我回尽量把注释写清楚。下面这个例子定义了一个简单的topology,它包括一个数据喷发节点spout,和一个数据处理节点bolt。
package com.jd.comaple.storm.test.topology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import com.jd.comaple.storm.test.bolt.SimpleBolt;
import com.jd.comaple.storm.test.spout.SimpleSpout;
/**
* Created by IntelliJ IDEA.
* User: comaple.zhang
* Date: 12-8-28
* Time: 下午2:11
* To change this template use File | Settings | File Templates.
*/
public class SimpleTopology {
public static void main(String[] args) {
try {
//实例化topologyBuilder类。
TopologyBuilder topologyBuilder = new TopologyBuilder();
//设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
topologyBuilder.setSpout("simple-spout", new SimpleSpout(), 1);
// 设置数据处理节点,并分配并发数。指定该几点接收喷发节点的策略为随机方式。
topologyBuilder.setBolt("simple-bolt", new SimpleBolt(), 3).shuffleGrouping("simple-spout");
Config config = new Config();
config.setDebug(false);
if (args != null && args.length > 0) {
/*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程
如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了
一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交
但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。
*/
config.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
} else {
//这里是本地模式下运行的启动代码。
config.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("simple", config,
topologyBuilder.createTopology());
}
} catch (Exception e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}