Clojure 实战 (5):Storm 实时计算框架

时间:2023-01-09 16:16:11

Storm简介

上一章介绍的Hadoop工具能够对海量数据进行批量处理,采用分布式的并行计算架构,只需使用其提供的MapReduce API编写脚本即可。但随着人们对数据实时性的要求越来越高,如实时日志分析、实时推荐系统等,Hadoop就无能为力了。

这时,Storm诞生了。它的设计初衷就是提供一套分布式的实时计算框架,实现低延迟、高并发的海量数据处理,被誉为“Realtime Hadoop”。它提供了简单易用的API接口用于编写实时处理脚本;能够和现有各类消息系统整合;提供了HA、容错、事务、RPC等高级特性。

Storm的官网是:storm-project.net,它的Wiki上有非常详尽的说明文档。

Storm与Clojure

Storm的主要贡献者Nathan Marz*明都是活跃的Clojure开发者,因此在Storm框架中也提供了原生的Clojure DSL。本文就将介绍如何使用这套DSL来编写Storm处理脚本。

Storm集群的安装配置这里不会讲述,具体请参考这篇文档。下文的脚本都运行在“本地模式”之下,因此即使不搭建集群也可以运行和调试。

Storm脚本的组件

Clojure 实战 (5):Storm 实时计算框架

Storm脚本的英文名称叫做“Storm Topology”,直译过来是“拓扑结构”。这个脚本由两大类组建构成,SpoutBolt,分别可以有任意多个。他们之间以“数据流”的方式连接起来,因此整体看来就像一张拓扑网络,因此得名Topology

Spout

数据源节点,是整个脚本的入口。Storm会不断调用该节点的nextTuple()方法来获取数据,分发给下游Bolt节点。nextTuple()方法中可以用各种方式从外部获取数据,如逐行读取一个文件、从消息队列(ZeroMQ、Kafka)中获取消息等。一个Storm脚本可以包含多个Spout节点,从而将多个数据流汇聚到一起进行处理。

Bolt

数据处理节点,它是脚本的核心逻辑。它含有一个execute()方法,当接收到消息时,Storm会调用这个函数,并将消息传递给它。我们可以在execute()中对消息进行过滤(只接收符合条件的数据),或者进行聚合(统计某个条件的数据出现的次数)等。处理完毕后,这个节点可以选择将处理后的消息继续传递下去,或是持久化到数据库中。

Bolt同样是可以有多个的,且能够前后组合。Bolt C可以同时收取Bolt ABolt B的数据,并将处理结果继续传递给Bolt D

此外, 一个Bolt可以产生多个实例 ,如某个Bolt包含复杂耗时的计算,那在运行时可以调高其并发数量(实例的个数),从而达到并行处理的目的。

Tuple

Tuple是消息传输的基本单元,一条消息即一个Tuple。可以将其看做是一个HashMap对象,它能够包含任何可序列化的数据内容。对于简单的数据类型,如整型、字符串、Map等,Storm提供了内置的序列化支持。而用户自定义的数据类型,可以通过指定序列化/反序列化函数来处理。

Stream Grouping

想象一个Spout连接了两个Bolt(或一个Bolt的两个实例),那数据应该如何分发呢?你可以选择轮询(ShuffleGrouping),或是广播(GlobalGrouping)、亦或是按照某一个字段进行哈希分组(FieldGrouping),这些都称作为Stream Grouping

示例:WordCount

下面我们就来实现一个实时版的WordCount脚本,它由以下几个组件构成:

  • sentence-spout:从已知的一段文字中随机选取一句话发送出来;
  • split-bolt:将这句话按空格分割成单词;
  • count-bolt:统计每个单词出现的次数,每五秒钟打印一次,并清零。

依赖项和配置文件

首先使用lein new新建一个项目,并修改project.clj文件:

(defproject cia-storm "0.1.0-SNAPSHOT"
...
:dependencies [[org.clojure/clojure "1.4.0"]
[org.clojure/tools.logging "0.2.6"]]

:profiles {:dev {:dependencies [[storm "0.8.2"]]}}
:plugins [[lein2-eclipse "2.0.0"]]
:aot [cia-storm.wordcount])

其中:profiles表示定义不同的用户配置文件。Leiningen有类似于Maven的配置文件体系(profile),每个配置文件中可以定义project.clj所支持的各种属性,执行时会进行合并。lein命令默认调用:dev:user等配置文件,可以使用lein with-profiles prod run来指定配置文件。具体可以参考这份文档

这里将[storm "0.8.2"]依赖项定义在了:dev配置下,如果直接定义在外层的:dependencies下,那在使用lein uberjar进行打包时,会将storm.jar包含在最终的Jar包中,提交到Storm集群运行时就会报冲突。而lein uberjar默认会跳过:dev配置,所以才这样定义。

:aot表示Ahead Of Time,即预编译。我们在Clojure实战(3)中提过:gen-class这个标识表示为当前.clj文件生成一个.class文件,从而能够作为main函数使用,因此也需要在project.clj中添加:main标识,指向这个.clj文件的命名空间。如果想为其它的命名空间也生成对应的.class文件,就需要用到:aot了。它的另一个用处是加速Clojure程序的启动速度。

sentence-spout

(ns cia-storm.wordcount
...
(:use [backtype.storm clojure config]))


(defspout sentence-spout ["sentence"]
[conf context collector]
(let [sentences ["a little brown dog"
"the man petted the dog"
"four score and seven years ago"
"an apple a day keeps the doctor away"]
]

(spout
(nextTuple []
(Thread/sleep 1000)
(emit-spout! collector [(rand-nth sentences)]))
)
)
)

defspout是定义在backtype.storm.clojure命名空间下的宏,可以点此查看源码。以下是各个部分的说明:

  • sentence-spout是该组件的名称。
  • ["sentence"]表示该组件输出一个字段,名称为“sentence”。
  • [conf context collector]用于接收Storm框架传入的参数,如配置对象、上下文对象、下游消息收集器等。
  • spout表示开始定义数据源组件需要用到的各类方法。它实质上是生成一个实现了ISpout接口的对象,从而能够被Storm框架调用。
  • nextTuple是ISpout接口必须实现的方法之一,Storm会不断调用这个方法,获取数据。这里使用Thread#sleep函数来控制调用的频率。
  • emit-spout!是一个函数,用于向下游发送消息。

ISpout还有open、ack、fail等函数,分别表示初始化、消息处理成功的回调、消息处理失败的回调。这里我们暂不深入讨论。

split-bolt

(defbolt split-bolt ["word"] {:prepare true}
[conf context collector]
(bolt
(execute [tuple]
(let [words (.split (.getString tuple 0) " ")]
(doseq [w words]
(emit-bolt! collector [w]))
)

(ack! collector tuple))
)
)

defbolt用于定义一个Bolt组件。整段代码的结构和defspout是比较相似的。bolt宏会实现为一个IBolt对象,execute是该接口的方法之一,其它还有preparecleanupexecute方法接收一个参数tuple,用于接收上游消息。

ack!execute中必须调用的一个方法。Storm会对每一个组件发送出来的消息进行追踪,上游组件发出的消息需要得到下游组件的“确认”(ACKnowlege),否则会一直堆积在内存中。对于Spout而言,如果消息得到确认,会触发ISpout#ack函数,否则会触发ISpout#fail函数,这时Spout可以选择重发或报错。

代码中比较怪异的是{:prepare true}defspoutdefbolt有两种定义方式,即prepare和非prepare。两者的区别在于:

  • 参数不同,prepare方式下接收的参数是[conf context collector],非prepare方式下,defspout接收的是[collector]defbolt是[tuple collector]`。
  • prepare方式下需要调用spoutbolt宏来编写组件代码,而非prepare方式则不需要——defspout会默认生成nextTuple()函数,defbolt默认生成execute(tuple)
  • 只有prepare方式下才能指定ISpout#openIBolt#prepare等函数,非prepare不能。
  • defspout默认使用prepare方式,defbolt默认使用非prepare方式。

因此,split-bolt可以按如下方式重写:

(defbolt split-bolt ["word"]
[tuple collector]
(let [words (.split (.getString tuple 0) " ")]
(doseq [w words]
(emit-bolt! collector [w]))

(ack! collector tuple))
)

prepare方式可以用于在组件中保存状态,具体请看下面的计数Bolt。

count-bolt

(defbolt count-bolt [] {:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(execute [tuple]
(let [word (.getString tuple 0)]
(swap! counts (partial merge-with +) {word 1}))

(ack! collector tuple))
)
)
)

原子(Atom)

atom是我们遇到的第一个可变量(Mutable Variable),其它的有Ref、Agent等。Atom是“原子”的意思,我们很容易想到原子性操作,即同一时刻只有一个线程能够修改Atom的值,因此它是处理并发的一种方式。这里我们使用Atom来保存每个单词出现的数量。以下是Atom的常用操作:

user=> (def cnt (atom 0))
user=> (println @cnt) ; 使用@符号获取Atom中的值。
0
user=> (swap! cnt inc) ; 将cnt中的值置换为(inc @cnt),并返回该新的值
1
user=> (println @cnt)
1
user=> (swap! cnt + 10) ; 新值为(+ @cnt 10)
11
user=> (reset! cnt 0) ; 归零
0

需要注意的是,(swap! atom f arg ...)中的f函数可能会被执行多次,因此要确保它没有副作用(side-effect,即不会产生其它状态的变化)。

再来解释一下(partial merge-with +)merge-with函数是对map类型的一种操作,表示将一个或多个map合并起来。和merge不同的是,merge-with多接收一个f函数(merge-with [f & maps]),当键名重复时,会用f函数去合并它们的值,而不是直接替代。

partial可以简单理解为给函数设定默认值,如:

user=> (defn add [a b] (+ a b))
user=> (add 5 10)
15
user=> (def add-5 (partial add 5))
user=> (add-5 10)
15

这样一来,(swap! counts (partial merge-with +) {word 1})就可理解为:将counts这个Atom中的值(一个map类型)和{word 1}这个map进行合并,如果单词已存在,则递增1。

线程(Thread)

为了输出统计值,我们为count-bolt增加prepare方法:

...
(bolt
(prepare [conf context collector]
(.start (Thread. (fn []
(while (not (Thread/interrupted))
(logging/info
(clojure.string/join ", "
(for [[word count] @counts]
(str word ": " count))
)
)

(reset! counts {})
(Thread/sleep 5000))
)
)
)
)
)

...

这段代码的功能是:在Bolt开始处理消息之前启动一个线程,每隔5秒钟将(atom counts)中的单词出现次数打印出来,并对其进行清零操作。

这里我们直接使用了Java的Thread类型。读者可能会觉得好奇,Thread类型的构造函数只接收实现Runnable接口的对象,Clojure的匿名函数直接支持吗?我们做一个简单测试:

user=> (defn greet [name] (println "Hi" name))
user=> (instance? Runnable greet)
true
user=> (instance? Runnable #(+ 1 %))
true

logging命名空间对应的依赖是[org.clojure/tools.logging "0.2.6"],需要将其添加到project.clj中,它是对log4j组件的包装。这里之所以没有使用println输出到标准输出,是为了将该脚本上传到Storm集群中运行时也能查看到日志输出。

定义和执行Topology

各个组件已经定义完毕,下面让我们用它们组成一个Topology:

(defn mk-topology []
(topology
{"sentence" (spout-spec sentence-spout)}
{"split" (bolt-spec {"sentence" :shuffle}
split-bolt
:p 3)

"count" (bolt-spec {"split" ["word"]}
count-bolt
:p 2)
}
)
)

topology同样是Clojure DSL定义的宏,它接收两个map作为参数,一个用于定义使用到的Spout,一个则是Bolt。该map的键是组件的名称,该名称用于确定各组件之间的关系。

spout-specbolt-spec则定义了组件在Topology中更具体的参数。如”split”使用的是split-bolt这个组件,它的上游是”sentence”,使用shuffleGrouping来对消息进行分配,:p 3表示会启动3个split-bolt实例。

“count”使用count-bolt组件,上游是”split”,但聚合方式采用了fieldGrouping,因此列出了执行哈希运算时使用的消息字段(word)。为何要使用fieldGrouping?因为我们会开启两个count-bolt,如果采用shuffleGrouping,那单词“a”第一次出现的消息会发送给一个count-bolt,第二次出现会发送给另一个count-bolt,这样统计结果就会错乱。如果指定了:p 1,即只开启一个count-bolt实例,就不会有这样的问题。

本地模式和Cluster模式

(ns cia-storm.wordcount
(:import [backtype.storm StormSubmitter LocalCluster])
...
(:gen-class))


(defn run-local! []
(let [cluster (LocalCluster.)]
(.submitTopology cluster
"wordcount" {} (mk-topology))

(Thread/sleep 30000)
(.shutdown cluster))
)


(defn submit-topology! [name]
(StormSubmitter/submitTopology
name {TOPOLOGY-WORKERS 3} (mk-topology))
)


(defn -main
([]
(run-local!))

([name]
(submit-topology! name))
)

我们为WordCount生成一个类,它的main函数在没有命令行参数时会以本地模式执行Topology,若传递了参数(即指定了脚本在Cluster运行时的名称),则提交至Cluster。

这里直接使用了Storm的Java类,对参数有疑惑的可以参考JavadocTOPOLOGY-WORKERS是在backtype.storm.config命名空间中定义的,我们在前面的代码中:use过了。Storm这个项目是用Java和Clojure混写的,所以查阅代码时还需仔细一些。

运行结果

首先我们直接用lein以本地模式运行该Topology:

$ lein run -m cia-storm.wordcount
6996 [Thread-18] INFO cia-storm.wordcount - doctor: 17, the: 31, a: 29, an: 17, ago: 13, seven: 13, and: 13
6998 [Thread-21] INFO cia-storm.wordcount - four: 13, keeps: 17, away: 17, score: 13, petted: 7, brown: 12, little: 12, years: 13, man: 7, apple: 17, dog: 19, day: 17
11997 [Thread-18] INFO cia-storm.wordcount - ago: 6, seven: 6, and: 6, doctor: 7, an: 7, the: 39, a: 28
11998 [Thread-21] INFO cia-storm.wordcount - four: 6, keeps: 7, away: 7, score: 6, petted: 16, brown: 21, little: 21, years: 6, man: 16, apple: 7, dog: 37, day: 7

Cluster模式需要搭建本地集群,可以参考这篇文档。下文使用的storm命令则需要配置~/.storm/storm.yaml文件,具体请参考这篇文章

$ lein do clean, compile, uberjar
$ storm jar target/cia-storm-0.1.0-SNAPSHOT-standalone.jar cia_storm.wordcount wordcount
$ cd /path/to/storm/logs
$ tail worker-6700.log
2013-05-11 21:26:15 wordcount [INFO] four: 9, keeps: 15, away: 15, score: 9, petted: 16, brown: 9, little: 9, years: 9, man: 16, apple: 15, dog: 25, day: 15
2013-05-11 21:26:20 wordcount [INFO] four: 10, keeps: 9, away: 9, score: 10, petted: 18, brown: 13, little: 13, years: 10, man: 18, apple: 9, dog: 31, day: 9
$ tail worker-6701.log
2013-05-11 21:27:10 wordcount [INFO] ago: 12, seven: 12, and: 12, doctor: 11, a: 31, an: 11, the: 25
2013-05-11 21:27:15 wordcount [INFO] ago: 14, seven: 14, and: 14, doctor: 11, the: 43, a: 19, an: 11

小结

这一章我们简单介绍了Storm的设计初衷,它是如何通过分布式并行运算解决实时数据分析问题的。Storm目前已经十分稳定,且仍处于活跃的开发状态。它的一些高级特性如DRPC、Trident等,还请感兴趣的读者自行研究。

本文使用的WordCount示例代码:https://github.com/jizhang/cia-storm