I want to pass a sideInput in PCollection Partition and On basis of that, i need to Divide my PCollection is their anyway....
我想在PCollection分区中传递一个sideInput,在此基础上,我需要分割我的PCollection就是他们无论如何....
PCollectionList<TableRow> part = merged.apply(Partition.of(Pcollection Count Function Called, new PartitionFn<TableRow>(){
@Override
public int partitionFor(TableRow arg0, int arg1) {
return 0;
}
}));
Any Other Way through Which I Can Partition My PCollection
我可以分配我的PCollection的任何其他方式
//Without Dynamic destination partitioning BigQuery table
//没有动态目标分区BigQuery表
merge.apply("write into target", BigQueryIO.writeTableRows()
.to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value) {
TableRow row = value.getValue();
TableReference reference = new TableReference();
reference.setProjectId("XYZ");
reference.setDatasetId("ABC");
System.out.println("date of row " + row.get("authorized_transaction_date_yyyymmdd").toString());
LOG.info("date of row "+
row.get("authorized_transaction_date_yyyymmdd").toString());
String str = row.get("authorized_transaction_date_yyyymmdd").toString();
str = str.substring(0, str.length() - 2) + "01";
System.out.println("str value " + str);
LOG.info("str value " + str);
reference.setTableId("TargetTable$" + str);
return new TableDestination(reference, null);
}
}).withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
@Override
public TableRow apply(TableRow input) {
LOG.info("format function:"+input.toString());
return input;
}
})
.withSchema(schema1).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Now I have to use Dynamic Destination Any Solution.Insted Of this and have to Do Partition.
现在我必须使用Dynamic Destination Any Solution.Insted Of this并且必须进行分区。
1 个解决方案
#1
1
Based on seeing TableRow
in your code, I suspect that you want to write a PCollection
to BigQuery, sending different elements to different BigQuery tables. BigQueryIO.write()
already provides a method to do that, using BigQueryIO.write().to(DynamicDestinations)
. See Writing different values to different BigQuery tables in Apache Beam.
基于在代码中看到TableRow,我怀疑您想要向BigQuery编写PCollection,将不同的元素发送到不同的BigQuery表。 BigQueryIO.write()已经提供了一种方法,使用BigQueryIO.write()。to(DynamicDestinations)。请参阅在Apache Beam中将不同的值写入不同的BigQuery表。
#1
1
Based on seeing TableRow
in your code, I suspect that you want to write a PCollection
to BigQuery, sending different elements to different BigQuery tables. BigQueryIO.write()
already provides a method to do that, using BigQueryIO.write().to(DynamicDestinations)
. See Writing different values to different BigQuery tables in Apache Beam.
基于在代码中看到TableRow,我怀疑您想要向BigQuery编写PCollection,将不同的元素发送到不同的BigQuery表。 BigQueryIO.write()已经提供了一种方法,使用BigQueryIO.write()。to(DynamicDestinations)。请参阅在Apache Beam中将不同的值写入不同的BigQuery表。