I am trying to implement a Reshuffle
transform to prevent excessive fusion, but I don't know how to alter the version for <KV<String,String>>
to deal with simple PCollections. (How to reshuffle PCollection <KV<String,String>>
is described here.)
我试图实现重新洗牌转换以防止过度融合,但我不知道如何改变
How would I expand the official Avro I/O example code to reshuffle before adding more steps in my pipeline?
在我的管道中添加更多步骤之前,如何将官方Avro I / O示例代码扩展为重新洗牌?
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
PCollection<GenericRecord> records =
p.apply(AvroIO.Read.named("ReadFromAvro")
.from("gs://my_bucket/path/records-*.avro")
.withSchema(schema));
1 个解决方案
#1
3
Thanks to the code snippet provided by the Google support team I figured it out:
感谢Google支持团队提供的代码段,我发现了这一点:
To get a reshuffled PCollection:
要获得重新洗牌的PCollection:
PCollection<T> reshuffled = data.apply(Repartition.of());
The Repartition class used:
使用的Repartition类:
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import java.util.concurrent.ThreadLocalRandom;
public class Repartition<T> extends PTransform<PCollection<T>, PCollection<T>> {
private Repartition() {}
public static <T> Repartition<T> of() {
return new Repartition<T>();
}
@Override
public PCollection<T> apply(PCollection<T> input) {
return input
.apply(ParDo.named("Add arbitrary keys").of(new AddArbitraryKey<T>()))
.apply(GroupByKey.<Integer, T>create())
.apply(ParDo.named("Remove arbitrary keys").of(new RemoveArbitraryKey<T>()));
}
private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
}
}
private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
@Override
public void processElement(ProcessContext c) throws Exception {
for (T s : c.element().getValue()) {
c.output(s);
}
}
}
}
#1
3
Thanks to the code snippet provided by the Google support team I figured it out:
感谢Google支持团队提供的代码段,我发现了这一点:
To get a reshuffled PCollection:
要获得重新洗牌的PCollection:
PCollection<T> reshuffled = data.apply(Repartition.of());
The Repartition class used:
使用的Repartition类:
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import java.util.concurrent.ThreadLocalRandom;
public class Repartition<T> extends PTransform<PCollection<T>, PCollection<T>> {
private Repartition() {}
public static <T> Repartition<T> of() {
return new Repartition<T>();
}
@Override
public PCollection<T> apply(PCollection<T> input) {
return input
.apply(ParDo.named("Add arbitrary keys").of(new AddArbitraryKey<T>()))
.apply(GroupByKey.<Integer, T>create())
.apply(ParDo.named("Remove arbitrary keys").of(new RemoveArbitraryKey<T>()));
}
private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
}
}
private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
@Override
public void processElement(ProcessContext c) throws Exception {
for (T s : c.element().getValue()) {
c.output(s);
}
}
}
}