I have a set of records that have a sequential IDs. The set of records is sorted by the ID field. I would like to take a rolling window of 10 records (i.e. records 0-9, then records 1-10, records 2-11, etc...). In Python and Pandas the following works perfectly:
我有一组具有顺序ID的记录。记录集按ID字段排序。我想采取10个记录的滚动窗口(即记录0-9,然后记录1-10,记录2-11等...)。在Python和Pandas中,以下工作完美:
windows = my_data_frame.rolling(10).sum().dropna()
I would like to accomplish the same thing in Apache Beam. Reading the Apache Beam Programming Guide it appears the only way to do windowing is by timestamp. Furthermore, this example implies that in order to accomplish windowing by record I would need to add an arbitrary timestamp to each record and then window on that timestamp.
我想在Apache Beam中完成同样的事情。阅读Apache Beam Programming Guide看来,进行窗口化的唯一方法是通过时间戳。此外,这个例子意味着为了通过记录完成窗口化,我需要为每个记录添加一个任意时间戳,然后在该时间戳上添加窗口。
I am very new to Apache Beam, and since windowing by record is essentially one line in Pandas, I feel that I am missing something in Beam that would allow me to accomplish this.
我对Apache Beam很新,而且由于记录的窗口基本上是Pandas中的一行,我觉得我在Beam中遗漏了一些可以让我实现这一目标的东西。
2 个解决方案
#1
0
You can inject your sequential IDs as event timestamps and use SlidingWindow.of(10).every(1)
. The time unit depends on how you transform sequential IDs into event timestamps.
您可以将顺序ID作为事件时间戳注入,并使用SlidingWindow.of(10).every(1)。时间单位取决于您将顺序ID转换为事件时间戳的方式。
The code would look like
代码看起来像
TextIO
.read(...)
.apply(ParDo.of(new DoFn<String, Long>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.outputWithTimestamp(..., timestamp transformed from sequential IDs)
}
}))
.apply(SlidingWindow.of(10).every(1))
.apply(Sums.longsGlobally())
#2
0
Since you already have the set of records, you can write a batch pipeline. For each record, you can map it to a key/value pair, where the key is, say id/10, and the value is the record. Then you can perform a GroupByKey, which group (or in your word, window) 10 records into one key.
由于您已经拥有了一组记录,因此可以编写批处理管道。对于每个记录,您可以将其映射到键/值对,其中键是,例如id / 10,值是记录。然后你可以执行GroupByKey,将一组(或在你的话,窗口中)10条记录组合成一个键。
In your example, if you want to sum every 10 elements, you can also use the Sum transform (after you attach each record with a key), example from https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/Sum.html:
在您的示例中,如果要对每10个元素求和,您还可以使用Sum转换(在使用键附加每个记录之后),例如来自https://beam.apache.org/documentation/sdks/javadoc/0.5 0.0 /组织/阿帕奇/束/ SDK /变换/ Sum.html:
PCollection<KV<String, Integer>> input = ...;
PCollection<KV<String, Integer>> sumPerKey = input
.apply(Sum.<String>integersPerKey());
#1
0
You can inject your sequential IDs as event timestamps and use SlidingWindow.of(10).every(1)
. The time unit depends on how you transform sequential IDs into event timestamps.
您可以将顺序ID作为事件时间戳注入,并使用SlidingWindow.of(10).every(1)。时间单位取决于您将顺序ID转换为事件时间戳的方式。
The code would look like
代码看起来像
TextIO
.read(...)
.apply(ParDo.of(new DoFn<String, Long>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.outputWithTimestamp(..., timestamp transformed from sequential IDs)
}
}))
.apply(SlidingWindow.of(10).every(1))
.apply(Sums.longsGlobally())
#2
0
Since you already have the set of records, you can write a batch pipeline. For each record, you can map it to a key/value pair, where the key is, say id/10, and the value is the record. Then you can perform a GroupByKey, which group (or in your word, window) 10 records into one key.
由于您已经拥有了一组记录,因此可以编写批处理管道。对于每个记录,您可以将其映射到键/值对,其中键是,例如id / 10,值是记录。然后你可以执行GroupByKey,将一组(或在你的话,窗口中)10条记录组合成一个键。
In your example, if you want to sum every 10 elements, you can also use the Sum transform (after you attach each record with a key), example from https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/Sum.html:
在您的示例中,如果要对每10个元素求和,您还可以使用Sum转换(在使用键附加每个记录之后),例如来自https://beam.apache.org/documentation/sdks/javadoc/0.5 0.0 /组织/阿帕奇/束/ SDK /变换/ Sum.html:
PCollection<KV<String, Integer>> input = ...;
PCollection<KV<String, Integer>> sumPerKey = input
.apply(Sum.<String>integersPerKey());