一、简介
Storm 可以保证 spout 发出的每条消息都能被“完全处理” ,这也是直接区别于其他实时系统的地方,如 S4。
请注意,spout 发出的消息后续可能会触发产生成千上万条消息 ,可以形象的理解为一棵消息树, 其中 spout 发出的消息为树根, Storm会跟踪这棵消息树的处理情况, 只有当这棵消息树中的所有消息都被处理了,Storm 才会认为 spout 发出的这个消息已经被“完全处理” 。如果这棵消息树中的任何一个消息处理失败了, 或者整棵消息树在限定的时间内没有“完全处理” ,那么 spout 发出的消息就会重发。
二、理解消息被完整处理
一个消息(tuple)从 spout 发送出来, 可能会导致成百上千的消息 基于此消息被创建
以“单词统计”为例:它的消息可分裂为多个消息,这些消息构成一个树状结构,被称为“tuple tree”,如图
当下面的条件同时被满足,Storm 才会认为一个从 spout 发送出来的消息被完整处理.
- tuple tree 不再生长
- 树中的任何消息被标识为“已处理”
如果在指定的时间内,一个消息衍生出来的 tuple tree 未被完全 处理成功,则认为此消息未被完整处理。这个超时值可以通过任务级 参数Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 进行配置,默认超时 值为30秒。
考虑到尽可能减少对内存的消耗,Storm 并不会跟踪消息树中的每个消息,而是采用了一些特殊的策略,它把消息树当作一个整体来跟踪,对消息树中所有消息的唯一 id 进行异或计算,通过是否为零来判定 spout 发出的消息是否被“完全处理” ,这极大的节约了内存和简化了判定逻辑。
上面所说的,Storm 保证了每个消息至少被处理一次,但是对于有些计算场合,会严格要求每个消息只被处理一次,幸而 Storm 的0.7.0引入了事务性拓扑,解决了这个问题,后面会有详述。
三、消息的生命周期
要理解消息的可靠性,就要理解消息的生命周期,因为消息是在几个生命状态中切换。
Spout的重要接口:
一、消息的生命
- 首先, Storm 使用 spout 实例的 nextTuple()方法从 spout 请求 一个消息(tuple) 。
- 收到请求以后,spout 使用 open 方法中提供的 SpoutOutputCollector 向它的输出流发送一个或多个消息。每发送 一个消息,Spout 会给这个消息提供一个 message ID,它将会被用来 标识这个消息。SpoutOutputCollector 中发送消息格式如下: collector.emit(new Values(sentence), "messageId")。
- 接来下, 这些消息会被发送到后续业务处理的 bolts,并且 Storm 会跟踪由此消息产生出来的新消息。
- 当检测到一个消息衍生出来的 tuple tree 被完整处理后,Storm 会调用 Spout 中的 ack 方法,并将 此消息的 messageID 作为参数传入。
- 同理,如果某消息处理超时或失败, 则 此消息对应的 Spout 的 fail 方法会被调用,调用时此消息的 messageID 会被作为参数传入
二、消息’被处理‘的细节——重要
- 当 KestrelSpout 从 kestrel 队列中读取一个消息, 表示它 “打开 ” 了队列中某个消息。这意味着,此消息并未从队列中真正的删除, 而 是将此消息设置为“pending”状态,它等待来自客户端的应答,被 应答以后,此消息才会被真正的从队列中删除。
- 处于“pending”状 态的消息不会被其他的客户端看到。另外,如果一个客户端意外的断 开连接, 则由此客户端 “打开” 的所有消息都会被重新加入到队列中 。
- 当消息被“打开”的时候,kestrel 队列同时会为这个消息提供一个 唯一的标识。
- KestrelSpout 就是使用这个唯一的标识作为这个 tuple 的 messageID 的。稍后当 ack 或 fail 被调用的时候,KestrelSpout 会 把 ack 或者 fail 连同 messageID 一起发送给 kestrel 队列, kestrel 会将消息从队列中真正删除或者将它重新放回队列中。
四、可靠相关的 api
为了使用 Storm 提供的可靠处理特性,我们需要做两件事情:
- 无论何时在 tuple tree 中创建了一个新的节点,我们需要明确的 通知 Storm:collector.emit(tuple,new Values(word));
- 当处理完一个单独的消息时,我们需要告诉 Storm 这棵 tuple tree 的变化状态:collector.ack(tuple);
五、acker
Storm 系统中有一组叫做“acker”的特殊的任务,它们负责跟踪 DAG(有向无环图)中的每个消息。每当发现一个 DAG 被完全处理, 它就向创建这个根消息的 spout 任务发送一个信号。 拓扑中 acker 任 务的并行度可以通过配置参数 Config.TOPOLOGY_ACKERS 来设置。默 认的 acker 任务并行度为1,当系统中有大量的消息时,应该适当提 高 acker 任务的并发度。
每当 bolt 新生成一个消息,对应 tuple tree 中的根消息的 messageId 就拷贝到这个消息中。当这个消息被应答的时候,它就把 关于 tuple tree 变化的信息发送给跟踪这棵树的 acker。例如,他 会告诉 acker: 本消息已经处理完毕, 但是我派生出了一些新的消息 , 帮忙跟踪一下吧。
举个例子,假设消息 D 和 E 是由消息 C 派生出来的,这里演示了 消息 C 被应答时,tuple tree 是如何变化的。因为在 C 被从树中移除的同时 D 和 E 会被加入到 tuple tree 中 , 因此 tuple tree 不会被过早的认为已完全处理。
当 spout 发送一个消息的时候,它就通知对应的 acker 一个新的 根消息产生了,这时 acker 就会创建一个新的 tuple tree。当 acker 发现这棵树被完全处理之后,他就会通知对应的spout 任务。
Acker 任务是轻量级的,所以在拓扑中并不需要太多的 acker 存 在。可以通过 Storm UI 来观察 acker 任务的吞吐量,如果看上去吞 吐量不够的话,说明需要添加额外的 acker——config.setNumAckers(num)
关闭可靠性(任意一个即可):
- 将 参 数 Config.TOPOLOGY_ACKERS 设 置 为 0( config.setNumAckers(0) ),通过此方法,当 Spout 发送一个消 息的时候,它的 ack 方法将立刻被调用,这样就不能保证消息的 可靠性;
- Spout 发送一个消息时,不指定此消息的 messageID。这样 spout 中的 ack 和 fail 方法将不会被调用,即使 acker 参数不为0,但 这样就不能控制失败后是否重发消息。即在 spout 的 nextTuple 方法中这样发射数据:collector.emit(new Values(l));
- 如果你不在意某个消息派生出来的子孙消息的可靠性,则此消息 派生出来的子消息在发送时不要做锚定, 即在 emit 方法中不指定 输入消息。因为这些子孙消息没有被锚定在任何tuple tree 中 , 因此他们的失败不会引起任何 spout 重新发送消息。
开启使用可靠性(同时满足):
- 设置 Ackers 的数量为非0;conf.setNumAckers(1);
- 在 spout 发射消息时,绑定 messageId;collector.emit(new Values(l), messageId);
- 在 bolt 中 发 射 消 息 时 , 需 要 锚 定 到 上 一 个 消 息 上 ; collector.emit(input, new Values(word));
六、注意
这种模式,每发送一个消息,都会同步发送一个ack/fail,对于网络的带宽会有一定的消耗,如果对于可靠性要求不高,可通过使用不同的 emit 接口关闭该模式。