5. Storm高级篇
序列化
分布式RPC
High level overview
LinearDRPCTopologyBuilder
Local mode DRPC
Remote mode DRPC
更复杂的例子
Non-linear DRPC topologies
LinearDRPCTopologyBuilder如何起作用
Advanced
分布式RPC
分布式 RPC(DRPC)的设计目标是充分利用 Storm 的计算能力实现高密度的并行实时计算。Storm 接收若干个函数参数作为输入流,然后通过 DRPC 输出这些函数调用的结果。严格来说,DRPC 并不能算作是 Storm 的一个特性,因为它只是一种基于 Storm 原语 (Stream、Spout、Bolt、Topology) 实现的计算模式。虽然可以将 DRPC 从 Storm 中打包出来作为一个独立的库,但是与 Storm 集成在一起显然更有用。
High level overview
分布式RPC是通过“DRPC server”协调处理的(Storm用一个包来实现该功能)。DRPC server 负责接收 RPC 请求,并将该请求发送到 Storm 中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。因此,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。例如,以下是一个使用参数 “http://twitter.com” 调用 “reach” 函数计算结果的例子:
DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");
分布式RPC工作流示意图如下所示:
客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC server中接收一个函数调用流,DRPC Server会为每个函数调用都标记了一个唯一的 id,随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC Server,根据函数调用的 id 来将函数调用的结果返回。
LinearDRPCTopologyBuilder
Storm中提供了名为LinearDRPCTopologyBuilder 的topology builder,它几乎自动完成了DRPC的所有步骤,如下所示:
1.设置spout
2.向DRPC server返回运行结果。
3.给bolts提供了聚集元组的功能。
让我们一起看一下简单的例子,该例子是DRPC topology的一个实现并返回结果为输入附加字符串“!”。
public static class ExclaimBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "!"));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
// ...}
正如你所见,当创建LinearDRPCTopologyBuilder 时,你需要让topology知道 DRPC函数的名字。单个DRPC server负责很多函数的协调处理,且这些函数的功能不同。你声明的第一个bolt将接受一个2元组,第一个域是请求id,第二个域是请求参数。LinearDRPCTopologyBuilder 中最后一个bolt会输出形式为[id,result]的2元组输出流。最后,所有中间结果的元组的第一个域必须包括请求id。
在本例子中,ExclaimBolt 只是简单地给第二个域附加字符串“!”。LinearDRPCTopologyBuilder 继续和DRPC server通信并将结果返回。
Local mode DRPC
DRPC可以以本地模式运行,下面以本地模式运行的例子:
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
cluster.shutdown();
drpc.shutdown();
首先你会创建一个 LocalDPRC 对象,该对象会在进程中模拟一个 DRPC 服务器,就像LocalCluster 在进程中模拟 Storm 集群的功能一样。然后,创建LocalCluster以本地模式运行topology 。LinearDRPCTopologyBuilder 有独立的方法用于创建本地topologies 和远程topologies 。在本地模式下,LocalDRPC 对象没有绑定任何端口所以topology需要知道正在和它进行通信的对象,这是方法createLocalTopology 接受LocalDRPC 对象作为输入参数的原因。
在启动拓扑后,你可以使用 execute 方法来完成 DRPC 调用。
Remote mode DRPC
在一个真实的集群中使用 DRPC 有以下三个步骤:
1.启动 DRPC Server;
2.配置 DRPC Server的地址;
3.将 DRPC topologies 提交到集群运行。
可以像 Nimbus、Supervisor 那样使用 storm 命令来启动 DRPC Serve,如下:
bin/storm drpc
接下来,你需要在集群上配置 DRPC Server的地址。这是为了让 DRPCSpout 获取从哪里触发函数调用的方法。可以通过编辑 storm.yaml 或者添加拓扑配置的方式实现配置。配置 storm.yaml 的方式类似于下面这样:
drpc.servers:
- "drpc1.foo.com"
- "drpc2.foo.com"
最后,你可以像其他拓扑一样使用 StormSubmitter 来启动拓扑。以下是使用远程模式构造拓扑的一个例子:
StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
createRemoteTopology 方法是用来创建集群模式下运行的topologies 。
更复杂的例子
上面描述的exclamation DRPC是为了说明DRPC的简单例子。下面让我们共同学习一下更复杂的例子-Storm集群并行计算的DRPC函数调用,该例子是计算Twitter上URL的访问。
URL访问是指不同的人在Twitter上发的推文,你需要完成如下计算:
1.获取所有tweeted了该URL的所有人。
2.获取所有关注了1中的所有人。
3.2中所有人的set集合。
4.统计3中set集合的个数。
一次计算可能涉及上百次的数据库调用和数以千万计的关注记录,这计算规模确实很大。正如你所见,实现Storm函数是非常简单的。在单台机器上,计算需要1分钟;但在集群中,即使最难计算的URL访问也只需数秒。
一个简单的访问topology 可以在storm-starter中找到。下面是定义访问topology 的具体步骤:
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12)
.shuffleGrouping();builder.addBolt(new PartialUniquer(), 6)
.fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2)
.fieldsGrouping(new Fields("id"));
topology 的执行按照以下四步骤:
1.GetTweeters 得到tweeted了URL的用户。它将[id,url]格式的输入流转换为[id,tweeter]格式的输出流。每个url将映射为多个tweeter元组。
2.GetFollowers得到tweeters的关注者。它将[id,tweeter]格式的输入流转换为[id,follower]格式的输出流。这些任务中,可能有重复的元组,因为一些人可能关注了多个人都tweeted了同样的URL。
3.PartialUniquer根据关注者id分组。这会导致同样的关注者在同样的任务中处理,所以每个PartialUniquer 的任务将会接受多个互补的关注者集合。一旦PartialUniquer 接受了所有的关注者元组,它将会输出关注者子集合元素的个数。
4.最后,CountAggregator 从PartialUniquer 接受部分count值然后累加完成整个计算,并返回结果。
下面看PartialUniquer bolt的代码实现:
public class PartialUniquer extends BaseBatchBolt {
BatchOutputCollector _collector;
Object _id;
Set<String> _followers = new HashSet<String>();
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_followers.add(tuple.getString(1));
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _followers.size()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "partial-count"));
}}
PartialUniquer 通过继承BaseBatchBolt 间接实现了IBatchBolt 接口。batch bolt提供了第一个类API,用于将一批元组作为一个具体单元进行处理。对于每个请求id,会创建一个新的batch bolt实例,在适当时候Storm也会清理这些实例。
当PartialUniquer 在execute方法中接受了一个关注者元组时,会将该元组添加到请求id的set集合中。
当任务中的一批元组处理完成后会调用batch bolts的finishBatch 方法,该方法输出关注者id集合元素个数形式的一元组。在后台,CoordinatedBolt 用来监测何时给定bolt已经接受到了请求id所有的元组,它用direct stream来管理。
topology 中剩下的部分都能很容易明白,正如你看到的,每个访问计算步骤都是并行完成的,并且定义DRPC topology是非常简单的。
Non-linear DRPC topologies
LinearDRPCTopologyBuilder 只处理线性DRPC的topology,它的计算是一个序列步骤。不难想象功能需求将需要更复杂的topology ,它可能涉及到bolts的分支与整合。
LinearDRPCTopologyBuilder如何起作用
DRPCSpout 发送[args,return-info]元组。return-info是DRPC server的主机名、端口号和生成的id。
topology的构造参数包括:
DRPCSpout
PrepareRequest (生成请求id和创建返回信息stream和参数stream)
CoordinatedBolt wrappers and direct groupings
JoinResult (返回信息进行join操作)
ReturnResult (连接到DRPC server并返回结果)
LinearDRPCTopologyBuilder 是一个很好的例子
Advanced
KeyedFairBolt 同时处理多个请求。
如何直接使用CoordinatedBolt。