I have a PCollection
of a key-value pair where the value is a Double. I need to compute the total number of values and their average.
我有一个键值对的PCollection,其值为Double。我需要计算值的总数及其平均值。
I see there are two transforms - Count
and Mean
. But I can't find a way to apply them simultaneously in a GroupBy operation.
我看到有两种变换 - 计数和均值。但我找不到在GroupBy操作中同时应用它们的方法。
It looks like my options are to either implement my own combine method that will implement both counting and averaging, or to apply Count and Mean separately and then join them on the original key.
看起来我的选择是要么实现我自己的组合方法,它将实现计数和平均,或者分别应用Count和Mean,然后将它们连接到原始键上。
Is there a third way to do it?
还有第三种方法吗?
Thanks, G
谢谢,G
1 个解决方案
#1
0
I would say that the proper way to do that is to write your own DoFn and use it after a GroupByKey transformation:
我想说正确的方法是编写自己的DoFn并在GroupByKey转换后使用它:
static class CountAndMean extends DoFn<KV<String, Iterator<Double>>, String> {
@Override
public void processElement(ProcessContext c) {
long count = 0L;
double sum = 0.0;
for(Double v: c.element().getValue()){
sum += v.doubleValue();
count += 1L;
}
double mean = sum/count;
String out = c.element().getKey() + "," + String.valueOf(mean) + "," + String.valueOf(count);
c.output(out);
}
PCollection<KV<String, Double>> inCol = ... ;
PCollection<KV<String, Iterable<Double>>> perKeyCol = inCol.apply(GroupByKey.<String, Double>create());
PCollection<String> outCol = perKeyCol.apply(ParDo.named("CountAndMean").of(new CountAndMean()));
#1
0
I would say that the proper way to do that is to write your own DoFn and use it after a GroupByKey transformation:
我想说正确的方法是编写自己的DoFn并在GroupByKey转换后使用它:
static class CountAndMean extends DoFn<KV<String, Iterator<Double>>, String> {
@Override
public void processElement(ProcessContext c) {
long count = 0L;
double sum = 0.0;
for(Double v: c.element().getValue()){
sum += v.doubleValue();
count += 1L;
}
double mean = sum/count;
String out = c.element().getKey() + "," + String.valueOf(mean) + "," + String.valueOf(count);
c.output(out);
}
PCollection<KV<String, Double>> inCol = ... ;
PCollection<KV<String, Iterable<Double>>> perKeyCol = inCol.apply(GroupByKey.<String, Double>create());
PCollection<String> outCol = perKeyCol.apply(ParDo.named("CountAndMean").of(new CountAndMean()));