I'm trying to solve what seems to be easy problem -- count how many elements there are in a PCollection per window. I need it to pass to .withSharding() function on write, to create as many shards as there are going to be files to write.
我正在尝试解决看似简单的问题 - 计算每个窗口的PCollection中有多少元素。我需要它在写入时传递给.withSharding()函数,以创建与要写入的文件一样多的分片。
I tried to do:
我试着做:
FileIO.writeDynamic<Long, E>()
.withDestinationCoder(AvroCoder.of(Long::class.java))
.by { e -> e.key }
.via(Contextful.fn(MySerFunction()))
.withNaming({ key -> MyFileNaming() })
.withSharding(ShardingFn())
.to("gs://some-output")
class ShardingFn : PTransform<PCollection<E>>, PCollectionView<Int>>() {
override fun expand(input: PCollection<E>): PCollectionView<Int> {
val keys: PCollection<Long> = input.apply(Keys.create())
// This only works with GlobalWindowing, how to count per window?
val count: PCollection<Long> = keys.apply(Count.globally())
val int: PCollection<Int> = count.apply(MapElements.via(Long2Int))
return int.apply(View.asSingleton())
}
However, this works only as long as I have global windowing (aka "batch mode"), otherwise Count.globally() will throw an exception.
但是,只要我有全局窗口(也称为“批处理模式”),这只会起作用,否则Count.globally()将抛出异常。
Maybe I'm doing it wrong for writing, but if I ever want to count elements per window for some other reason, how to do that?
也许我写错了,但是如果我因为某些其他原因想要计算每个窗口的元素,那该怎么做?
2 个解决方案
#1
1
Using Combine.globally(Count.<T>combineFn()).withoutDefaults()
instead of Count.globally()
should work in your case. This can also be found in the Javadoc: https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/transforms/Count.html#globally--
使用Combine.globally(Count。
#2
1
To count the data per window you have to use the timestamps (add one if there are none in the data) and then count them. I recommend to review this example as it explains in details how to do so.
要计算每个窗口的数据,您必须使用时间戳(如果数据中没有,则添加一个),然后计算它们。我建议查看此示例,因为它详细说明了如何执行此操作。
#1
1
Using Combine.globally(Count.<T>combineFn()).withoutDefaults()
instead of Count.globally()
should work in your case. This can also be found in the Javadoc: https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/transforms/Count.html#globally--
使用Combine.globally(Count。
#2
1
To count the data per window you have to use the timestamps (add one if there are none in the data) and then count them. I recommend to review this example as it explains in details how to do so.
要计算每个窗口的数据,您必须使用时间戳(如果数据中没有,则添加一个),然后计算它们。我建议查看此示例,因为它详细说明了如何执行此操作。