I'm using Google Cloud Dataflow and have a ParDo function that requires access to all the elements in a PCollection. To accomplish this, I wanted to convert a PCollection<T> into a PCollection<Iterable<T>> containing a single Iterable of all the elements. I was wondering if there's a cleaner/simpler/faster solution to what I have come up with.
我正在使用Google Cloud Dataflow并具有ParDo功能,需要访问PCollection中的所有元素。为了实现这一点,我想将PCollection
The first approach was to create a dummy key, perform a GroupByKey, and get the values afterwards.
第一种方法是创建一个虚拟密钥,执行GroupByKey,然后获取值。
PCollection<MyType> myData;
// AddDummyKey() outputs KV.of(1, context.element()) for everything
PCollection<KV<Integer, MyType>> myDataKeyed = myData.apply(ParDo.of(new AddDummyKey()));
// Group by dummy key
PCollection<KV<Integer, Iterable<MyType>>> myDataGrouped = myDataKeyed.apply(GroupByKey.create());
// Extract values
PCollection<Iterable<MyType>> myDataIterable = myDataGrouped.apply(Values.<Iterable<MyType>>create()
The second approach followed the recommendation here: How do I make View's asList() sortable in Google Dataflow SDK? but without the sorting. I created a View.asList(), created a dummy PCollection, and then applied a ParDo function on the dummy PCollection with the view as a side input and simply returned the view.
第二种方法遵循以下建议:如何在Google Dataflow SDK中对View的asList()进行排序?但没有排序。我创建了一个View.asList(),创建了一个虚拟PCollection,然后在虚拟PCollection上应用了一个ParDo函数,并将视图作为侧输入,并简单地返回了视图。
PCollection<MyType> myData;
// Create view of the PCollection as a list
PCollectionView<List<MyType>> myDataView = myData.apply(View.asList());
// Create dummy PCollection
PCollection<Integer> dummy = pipeline.apply(Create.<Integer>of(1));
// Apply dummy ParDo that returns the view
PCollection<List<MyType>> myDataList = dummy.apply(
ParDo.withSideInputs(myDataView).of(new DoFn<Integer, List<MyType>>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.sideInput(myDataView));
}
}));
It seems like there would be a pre-defined combine function for this task, but I can't find one. Thanks for the help!
看起来这个任务会有一个预先定义的组合函数,但我找不到一个。谢谢您的帮助!
1 个解决方案
#1
1
If you know you need the whole thing, then both of your approaches are reasonable. Both have been used in the Dataflow SDK and later when it became the Apache Beam SDK.
如果你知道你需要整个事情,那么你的两种方法都是合理的。两者都已在Dataflow SDK中使用,后来在它成为Apache Beam SDK时使用。
- Side input and then output the whole thing: This is how
DataflowAssert
works, in fact. In Beam, where different backend runners may implement side inputs differently, you should preferView.asIterable()
since it has fewer assumptions and may allow more streaming of a very large side input. - 侧输入然后输出整个事物:事实上,这就是DataflowAssert的工作原理。在Beam中,不同的后端运行程序可能以不同的方式实现侧输入,您应该更喜欢View.asIterable(),因为它具有较少的假设并且可能允许更大的侧输入流。
- Group by a single key and then drop the key: This is how Beam's successor
PAssert
works. It accomplishes the same thing, requires a little more care for empty collections, but more Beam runners have goodGroupByKey
support than side input support (especially when they are new and still under development). - 按一个键分组,然后放下键:这就是Beam的继任者PAssert的工作原理。它完成同样的事情,需要更多关心空集合,但更多的梁运行者具有良好的GroupByKey支持而不是侧输入支持(特别是当它们是新的并且仍在开发中时)。
So View.asIterable()
is basically intended to be just what you are asking for. There have also been some requests for a GroupGlobally
transform that does the second version; that could happen at some point.
所以View.asIterable()基本上就是你想要的。还有一些GroupGlobally转换请求执行第二个版本;这可能发生在某个时刻。
#1
1
If you know you need the whole thing, then both of your approaches are reasonable. Both have been used in the Dataflow SDK and later when it became the Apache Beam SDK.
如果你知道你需要整个事情,那么你的两种方法都是合理的。两者都已在Dataflow SDK中使用,后来在它成为Apache Beam SDK时使用。
- Side input and then output the whole thing: This is how
DataflowAssert
works, in fact. In Beam, where different backend runners may implement side inputs differently, you should preferView.asIterable()
since it has fewer assumptions and may allow more streaming of a very large side input. - 侧输入然后输出整个事物:事实上,这就是DataflowAssert的工作原理。在Beam中,不同的后端运行程序可能以不同的方式实现侧输入,您应该更喜欢View.asIterable(),因为它具有较少的假设并且可能允许更大的侧输入流。
- Group by a single key and then drop the key: This is how Beam's successor
PAssert
works. It accomplishes the same thing, requires a little more care for empty collections, but more Beam runners have goodGroupByKey
support than side input support (especially when they are new and still under development). - 按一个键分组,然后放下键:这就是Beam的继任者PAssert的工作原理。它完成同样的事情,需要更多关心空集合,但更多的梁运行者具有良好的GroupByKey支持而不是侧输入支持(特别是当它们是新的并且仍在开发中时)。
So View.asIterable()
is basically intended to be just what you are asking for. There have also been some requests for a GroupGlobally
transform that does the second version; that could happen at some point.
所以View.asIterable()基本上就是你想要的。还有一些GroupGlobally转换请求执行第二个版本;这可能发生在某个时刻。