Tuple元组

时间:2022-02-22 20:51:19

Tuple元组

  Tuple 是 Storm 的主要数据结构,并且是 Storm 中使用的最基本单元、数据模型和元组。

Tuple 描述

  Tuple 就是一个值列表, Tuple 中的值可以是任何类型的,动态类型的Tuple的fields可以不用声明;默认情况下,Storm中的Tuple支持私有类型、字符串、字节数组等作为它的字段值,如果使用其他类型,就需要序列化该类型。

  Tuple 的 字 段 默 认 类 型 有: integer、 float、 double、 long、short、 string、 byte、 binary(byte[])。
  Tuple 数据结构如图 3-1 所示。

Tuple元组

  图 3-1 Tuple 数据结构

  Tuple 可以理解成键值对。例如,创建一个Bolt 要发送两个字段(命名为 double 和 triple),其中键就是定义在declareOutputFields 方法中的 Fields 对象,值就是在 emit 方法中发送的 Values 对象。以下是一个简单例子。

public class DoubleAndTripleBolt extends BaseRichBolt {
OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}}

  此外,在使用的 Storm Java 包中, backtype.storm.tuple 主要有以下几个类:
‰ Fileds.class。
‰ MessageId.class。
‰ Tuple.class。
‰ TupleImpl.class。
‰ Values.class。
  列出以上内容是为了更好地理解 Tuple,这样能够从本质上理解 Tuple,在使用时更加得
心应手。

Tuple 的生命周期

  了解一个 Tuple 的生命周期就需要查看源码,如下的 Java 代码展示了 Spout(消息源)接口发出 Tuple(消息)的整个过程。
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
void close();
}

  首 先, Storm 调 用 Spout(消息源)的nextTuple 方法来获取下一个Tuple, Spout通过Open 方法的参数提供的SpoutOutputCollector将新Tuple发射到其中一个输出消息流。

注意:发射Tuple 时, Spout提供一个message-id,通过这个ID 来追踪该Tuple。

  接下来, Storm跟踪该Tuple 的树形结构是否成功创建,并根据 messageid 调用 Spout 中的 ack 函数,以确认 Tuple 是否被完全处理。如果Tuple超时,则调用 Spout 的 fail 方法。

  由此看出,同一个Tuple 不管是acked,还是failed 都是由创建它的Spout发出并维护的,所以,即使Spout 在集群环境中同时执行很多的任务,该Tuple 也不会被其他任务调用或生成 acked 或 failed 状态。总之, Storm 会利用内部的 Acker 机制保证每个 Tuple 被可靠地处理。最后,在任务完成后,Spout调用Close 方法结束 Tuple 的使命。