Google Dataflow中的数据重塑操作

时间:2021-02-23 15:20:09

I'm currently having a streaming pipeline processing events and pushing them to a BigQuery table named EventsTable:

我目前正在使用流管道处理事件并将它们推送到名为EventsTable的BigQuery表:

TransactionID    EventType
1                typeA
1                typeB
1                typeB
1                typeC
2                typeA
2                typeC
3                typeA

I want to add a branch to my processing pipeline and also "group" together transaction-related data into a TransactionsTable. Roughly, the value in the type columns in the TransactionsTable would be the count of the related eventType for a given transaction. With the previous example events, the output would look like this:

我想在我的处理管道中添加一个分支,并将与事务相关的数据“分组”到TransactionsTable中。大致上,TransactionsTable中类型列中的值将是给定事务的相关eventType的计数。使用前面的示例事件,输出将如下所示:

TransactionID      typeA     typeB     typeC
1                  1         2         1
2                  1         0         1
3                  1         0         0

The number of "type" columns would be equal to the number of different eventType that exists within the system.

“type”列的数量将等于系统中存在的不同eventType的数量。

I'm trying to see how I could do this with Dataflow, but cannot find any clean way to do it. I know that PCollections are immutable, so I cannot store the incoming data in a growing PCollection structure that would queue the incoming events up to the moment where the needed other elements are present and that I could write them to the second BigQuery table. Is there a sort of windowing function that would allow to do this with Dataflow (like queuing the events in a temporary windowed structure with a sort of expiry date or something)?

我试图看看如何使用Dataflow做到这一点,但找不到任何干净的方法来做到这一点。我知道PCollections是不可变的,所以我不能将传入的数据存储在不断增长的PCollection结构中,该结构将传入的事件排队到需要的其他元素存在的时刻,并且我可以将它们写入第二个BigQuery表。是否有一种窗口函数允许使用Dataflow执行此操作(例如在具有某种到期日期的临时窗口结构中排队事件)?

I could probably do something with batched jobs and PubSub, but this would be far more complex. On the other hand, I do understand that Dataflow is not meant to have ever growing data structures and that the data, once it goes in, has to go through the pipeline and exit (or be discarded). Am I missing something?

我可能会对批量作业和PubSub做些什么,但这会复杂得多。另一方面,我确实理解Dataflow并不意味着不断增长的数据结构,并且数据一旦进入,就必须通过管道并退出(或被丢弃)。我错过了什么吗?

1 个解决方案

#1


1  

In general, the easiest way to do this kind of "aggregate data across many events" is to use a CombineFn which allows you to combine all of the values associated with a specific key. This is typically more efficient than just queueing the events, because it only needs to accumulate the result rather than accumulate all of the events.

通常,执行此类“跨多个事件的聚合数据”的最简单方法是使用CombineFn,它允许您组合与特定键关联的所有值。这通常比排队事件更有效,因为它只需要累积结果而不是累积所有事件。

For your specific case, you could create a custom CombineFn. The accumulator would be a Map<EventType, Long>. For instance:

对于您的特定情况,您可以创建自定义CombineFn。累加器将是Map 。例如: ,long>

public class TypedCountCombineFn
    extends CombineFn<EventType, Map<EventType, Long>, TransactionRow> {
  @Override
  public Map<EventType, Long> createAccumulator() {
    return new HashMap<>();
  }
  @Override
  public Map<EventType, Long> addInput(
      Map<EventType, Long> accum, EventType input) {
    Long count = accum.get(input);
    if (count == null) { count = 0; accum.put(input, count); }
    count++;
    return accum;
  }
  @Override
  public Map<EventType, Long> mergeAccumulators(
      Iterable<Map<EventType, Long>> accums) {
    // TODO: Sum up all the counts for similar event types
  }
  @Override
  public TransactionRow extractOutput(Map<EventType, Long> accum) {
    // TODO: Build an output row from the per-event-type accumulator
  }
}

Applying this CombineFn can be done globally (across all transactions in a PCollection) or per-key (such as per transaction ID):

应用此CombineFn可以全局(跨PCollection中的所有事务)或按键(例如每个事务ID)完成:

PCollection<EventType> pc = ...;

// Globally
PCollection<TransactionRow> globalCounts = pc.apply(Combine.globally(new TypedCountCombineFn()));

// PerKey
PCollection<KV<Long, EventType>> keyedPC = pc.apply(WithKeys.of(new SerializableFunction<EventType, Long>() {
  @Override
  public long apply(EventType in) {
    return in.getTransactionId();
  }
});
PCollection<KV<Long, TransactionRow>> keyedCounts =
  keyedPC.apply(Combine.perKey(new TypedCountCombineFn()));

#1


1  

In general, the easiest way to do this kind of "aggregate data across many events" is to use a CombineFn which allows you to combine all of the values associated with a specific key. This is typically more efficient than just queueing the events, because it only needs to accumulate the result rather than accumulate all of the events.

通常,执行此类“跨多个事件的聚合数据”的最简单方法是使用CombineFn,它允许您组合与特定键关联的所有值。这通常比排队事件更有效,因为它只需要累积结果而不是累积所有事件。

For your specific case, you could create a custom CombineFn. The accumulator would be a Map<EventType, Long>. For instance:

对于您的特定情况,您可以创建自定义CombineFn。累加器将是Map 。例如: ,long>

public class TypedCountCombineFn
    extends CombineFn<EventType, Map<EventType, Long>, TransactionRow> {
  @Override
  public Map<EventType, Long> createAccumulator() {
    return new HashMap<>();
  }
  @Override
  public Map<EventType, Long> addInput(
      Map<EventType, Long> accum, EventType input) {
    Long count = accum.get(input);
    if (count == null) { count = 0; accum.put(input, count); }
    count++;
    return accum;
  }
  @Override
  public Map<EventType, Long> mergeAccumulators(
      Iterable<Map<EventType, Long>> accums) {
    // TODO: Sum up all the counts for similar event types
  }
  @Override
  public TransactionRow extractOutput(Map<EventType, Long> accum) {
    // TODO: Build an output row from the per-event-type accumulator
  }
}

Applying this CombineFn can be done globally (across all transactions in a PCollection) or per-key (such as per transaction ID):

应用此CombineFn可以全局(跨PCollection中的所有事务)或按键(例如每个事务ID)完成:

PCollection<EventType> pc = ...;

// Globally
PCollection<TransactionRow> globalCounts = pc.apply(Combine.globally(new TypedCountCombineFn()));

// PerKey
PCollection<KV<Long, EventType>> keyedPC = pc.apply(WithKeys.of(new SerializableFunction<EventType, Long>() {
  @Override
  public long apply(EventType in) {
    return in.getTransactionId();
  }
});
PCollection<KV<Long, TransactionRow>> keyedCounts =
  keyedPC.apply(Combine.perKey(new TypedCountCombineFn()));