Storm入门教程
1. Storm基础
Storm
Storm主要特点
Storm基本概念
Storm调度器
Storm配置
Guaranteeing Message Processing(消息处理保障机制)
Daemon Fault Tolerance(守护线程容错机制)
理解Storm拓扑的并行
Tutorial
Preliminaries(前期准备工作)
Storm集群的
Topologies
Streams
Data model
A simple topology
在本地模式下运行Exclamation Topology
Streaming grouping
用其他语言定义Bolts
消息保证处理机制
事务topologies
分布式RPC
Local模式
在生产环境中运行Topologies
Tutorial
在本文档中会阐述如何创建Storm拓扑并把它们部署在Storm集群中。Java是主要使用的编程语言,但也有一些例子是用Python写的,这也表明Storm是支持多种编程语言的。
Preliminaries(前期准备工作)
本文的例子来自storm-starter项目。建议你clone该项目并跟着下面的例子一起做。
Storm集群的组件
Storm集群类似于Hadoop集群,在Hadoop中执行的是“MapReduce jobs”,而在Storm中执行的是“topologies”,他们两者之间有很大区别,其中最大的区别是MapReduce job最终会执行完成,而topology会一直运行直到你手动杀死。
在Storm集群中有两种节点:master和worker。master节点运行在一个名为“Nimbus”的守护进程上,它类似于Hadoop中的“JobTracker”,Nimbus负责给集群分发代码、安排执行的任务和任务的监控。
每个worker节点运行在一个名为“Supervisor”的守护进程中。supervisor所在节点根据Nimbus分配给该节点的work来启动或停止worker进程,每个worker进程执行一个topology的部分,因此,一个运行的topology是由集群中的许多worker进程共同执行完成的。
Nimbus和Supervisor是通过Zookeeper集群来协调的,另外,Nimbus和Supervisor守护进程是快速失败和无状态的。所有的状态都保存在Zookeeper或本地磁盘上,这意味着你用kill -9将Nimbus或Supervisor杀死,它们重新启动后就像什么也没有发生一样,这种机制的设计使得Storm集群很稳定。
Topologies
在Storm中用topologies来进行实时计算。一个topology 是一个图计算,一个topology 中的每个节点包括处理逻辑,节点之间的连接表明数据如何在节点之间传输。运行一个topology 是很直观简单的,首先,你将程序代码及依赖包打包成一个jar包。然后,运行类似下面的命令:
storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
上面程序运行类org.apache.storm.MyTopology,参数为arg1 和arg2。该类的主函数中定义了topology 并提交到Nimbus中。storm jar部分会连接Nimbus并上传jar。
由于topology 定义是Thrift结构,Nimbus是一种Thrift服务,因此,你可以用任何一种语言创建和提交topologies 。
Streams
在Storm中主要的抽象是“stream”。一个stream是一个无边界的元组序列。Storm提供了将一个stream转换为一个新的分布式、可靠的stream的原函数。例如,你可以将tweets流转化为trending topics流。
Storm提供的基本流转换原函数有"spouts" and "bolts"。Spouts和bolts定义了运行你应用程序的具体逻辑的接口。
一个spout是一个stream的源头。例如,一个spout从Kestrel队列中读取tuples然后作为stream发送。或spout连接到Twitter的API然后作为twitter stream 发送。
一个bolt接入输入流、处理、可能作为新stream输出。复杂的流转换,比如根据tweets的流计算trending topics流,需要多个步骤和多个bolts。Bolts能做过滤tuples,流聚合,流连接,与数据库交互等。
spouts和bolts形成的网结构被打包成topology,它是Storm集群中*的抽象。一个topology是流转换的形成的图结构,每个节点是一个spout或bolt,图中的边表明bolts是连接的那个流。当spout和bolt给流发送tuple时,它给订阅该流的每个bolt发送tuple。
topology节点间的连接表明了tuple如何被传递。例如,如果在Spout A和Bolt B之间有连接,Spout A和Bolt C之间有连接,Bolt B和Bolt C之间有连接,然后Spout A每次发送一个tuple,它将同时发送给Bolt B和Bolt C。所有的Bolt B的输出将输入Bolt C。
Storm的topology 的每个节点的执行时并行的。你可以topology中指定每个节点的并行度,那么Storm将会启动对应的线程数去执行任务。
一个topology将永久执行,除非你手动停止。Storm会再次执行失败的task。另外,Storm保证了数据零丢失,即使在机器宕机或消息丢失的情况下。
Data model
Storm用tuples作为数据模型。一个元组是一组值的集合且tuple中的一个域可以是任何对象类型。创造性地,storm支持所有的原始类型,string,byte array,如果还想用其它类型,只需该类型实现序列化即可。
topology 中的每个节点必须声明输出tuple的域。如下所示,这个bolt用域"double" 和"triple"声明为二元组。
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
} }
declareOutputFields 函数为这个组件声明了输出域["double", "triple"]。
A simple topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");
上述topology包括一个spout和两个bolts。spout发送单词,每个bolt给它的输入附加字符串“!!!”,spout发送给第一个bolt,第一个bolt在发送给第二个bolt。如果spout发送元组为["bob"] 和["john"],那么第二个bolt将会输出["bob!!!!!!"]和["john!!!!!!"]。
代码用setSpout 和setBolt方法定义节点。这些方法接受用户指定的id、包括代码处理逻辑的对象和并行度。在本例中,spout的id为“words”,bolts的id分别为 "exclaim1" 和 "exclaim2"。
Spout中包含处理逻辑的对象实现了IRichSpout接口,bolt中包含处理逻辑的对象实现了IRichBolt接口。
最后一个参数,节点的并行度参数是可选的。它表明集群中有多少个线程执行该组件,如果你不设置,Storm默认分配一个线程。
setBolt 方法返回InputDeclarer对象,它被用来定义从Bolt的输入。这里,组件"exclaim1"声明了它想用shuffle grouping读取组件“words”发送的所有元组,组件“exclaim2”声明了它想用shuffle grouping读取组件“exclaim1”发送的所有元组。“shuffle grouping”是输入任务中的元组应随机分发到bolt的任务中。组件中有许多方式用来对数据分组。下面会有介绍。
如果你想要组件“exclaim2”读取组件“words“”words”的所有元组,你应该进行如下声明:
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");
正如你所见,输入声明能为Bolt指定多个源。
让我们探究一下这个topology中spouts和bolt是怎么实现的。Spouts负责发送新的消息。类TestWordSpout 每100ms从list集合["nathan", "mike", "jackson", "golda", "bertels"]中随机选取单词作为一元组发送出去,该类中的nextTuple()方法实现如下:
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));}
类ExclamationBolt给它的输入附加字符串“!!!”,下面是它的代码实现:
public static class ExclamationBolt implements IRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}}
prepare方法中提供了类OutputCollector ,它被用来发送元组。在这个bolt中元组可以随时发送在prepare、execute或cleanup方法中,甚至在另外一个线程中。prepare方法将OutputCollector 作为一个实例变量随后在execute方法中使用。
execute方法接受bolt的输入,类ExclamationBolt 获取第一个元组第一个域然后附加“”“!!!”作为新的tuple发送出去。如果你使一个bolt订阅了多个输入源,你可以用该方法Tuple#getSourceComponent找到。
在execute方法中还有一些任务要处理,也就是说输入元组是作为发送的第一个参数,然后最后一行响应输入元组,这是为了确保数据零丢失。
当Bolt正在被关闭和释放已打开的资源时,调用cleanup方法,但不能确保该方法会调用,例如,如果机器中任务一直在运行,那么就不会触发该方法。该方法是为了本地模式设计的和在没有资源泄露的前提下结束topology的运行。
declareOutputFields 方法声明了ExclamationBolt 发送带有一个名为“word”域的一元组。
getComponentConfiguration 方法允许配置该组件是如何运行的。
像cleanup 和getComponentConfiguration 方法在bolt的实现中不是必须的。你可以用几个基类作为默认实现,从而使代码更简洁。ExclamationBolt 可以通过继承BaseRichBolt使代码简洁:
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
} }
在本地模式下运行Exclamation Topology
Storm有两种运行模式:本地和集群分布式模式。在本地模式中,Storm用线程模拟worker节点执行任务。本地模式适合topologies的测试和开发,当你运行storm-starter中的topologies ,它们将以本地模式运行并且你能看到每个组件发送的消息。更多详情看Local mode。
在分布式集群模式中,Storm管理机器组成的集群。当你提交topology到master时,你也提交了运行topology的代码。master将负责分发你的代码和分配worker来运行topology。如果worker宕机,master会重新安排其它的。更多详情看Running topologies on a production cluster。
下面是Exclamation Topology本地模式的代码:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);cluster.killTopology("test");
cluster.shutdown();
首先,代码用LocalCluster对象定义了一个进程内的集群。提交topology给这个虚拟集群和提交到分布式集群中是一样的。通过调用LocalCluster的submitTopology提交了一个topology,该方法输入参数为运行的topology的名称、配置及其本身。
名称用来识别topology,也可以随后用来kill。一个topology一直运行直到你手动kill。
下面两个配置是非常常见的:
1.TOPOLOGY_WORKERS (用函数setNumWorkers)表明你想分配多少进程来运行topology。topology的每个组件将会用多线程执行。执行每个组件的线程数目通过setBolt 和setSpout 两种函数来配置。线程是在worker进程中创建的,每个worker进程包含执行组件的线程。例如,你可能有300个线程和50个worker进程,那么每个worker进程将会有6个线程,这6个线程可能用来执行不同的组件。你可以通过调整每个组件的并行度和worker进程数来改善Storm topology的性能。
2.TOPOLOGY_DEBUG (用函数setDebug),当设置为true时,Storm会记录组件发送的每条消息。当本地模式测试topology是非常有用的,当在集群模式中一般会关闭该属性。更多配置请看the Javadoc for Config。
Streaming grouping
streaming grouping告诉topology如何在两个组件中发送元组,记住,spouts和bolts是并行执行的。如果topology以task等级视角来看,如下图:
“stream grouping”告诉Storm如何在任务集中发送元组。在我们探究stream grouping之前,先看一下storm-starter。类WordCountTopology从spout读取句子和WordCountBolt 发出的流:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));
SplitSentence 将它接受到的句子中的每个单词作为元组发送出去,WordCount在内存中保存着单词和出现次数的map集合,WordCount 每次接受到一个单词,更新map的状态。
下面介绍一些stream groupings。
最简单的grouping是shuffle grouping,它给任务发送随机的元组。一个shuffle grouping被用在WordCountTopology 类中从RandomSentenceSpout 发送元组给SplitSentence ,它的作用是能平均地将所有的元组分配给SplitSentence bolt的任务中。
更有趣的一种grouping是“fields grouping”。这种grouping用在SplitSentence bolt和WordCount bolt之间。对于WordCount bolt关键的功能是同样的单词应该分配给同样的task,否则,不止一个任务将会处理同样的单词,甚至,它们将会发送错误值。fields grouping让stream通过fileds来分组,这会导致相同的值发送给同样的task处理。由于WordCount 用fields grouping订阅了SplitSentence's 的输出流,所以,同样的单词将分发给同样的任务且bolt会生成正确的输出。
Fields groupings是流连接和聚合的基础,它是用取余哈希来实现的。其他更多的stream grouping。
用其他语言定义Bolts
Bolts能用任一一种语言定义。用另外一种语言编写的bolts作为子过程执行,storm用JSON格式的消息和那些子过程通信。通信协议由100行代码的适配库编写,适配Ruby、Python、和Fancy等语言库。
下面是WordCountTopology中SplitSentence 的定义:
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}}
SplitSentence 覆写ShellBolt 并用python命令运行splitsentence.py代码,其代码实现如下:
import storm
class SplitSentenceBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
SplitSentenceBolt().run()
用其他语言编写spouts和bolts,请看Using non-JVM languages with Storm.
消息保证处理机制
事务topologies
Storm保证每条消息至少处理一次。一个常见的问题是“你如何在Storm上做count?不会多次计算吗?”,Storm的特征:事务topologies保证对于大多数计算只计算一次。详情请看transactional topologies。
分布式RPC
该文档阐述了Storm基本流处理相关知识。关于Storm还有更多有趣的事,如最有趣的应用之一是分布式RPC,你可以并行计算内部函数。详情请看Distributed RPC。