在PCollection分区中传递侧输入

时间:2021-07-07 15:31:39

I want to pass a sideInput in PCollection Partition and On basis of that, i need to Divide my PCollection is their anyway....

我想在PCollection分区中传递一个sideInput,在此基础上,我需要分割我的PCollection就是他们无论如何....

PCollectionList<TableRow> part = merged.apply(Partition.of(Pcollection Count Function Called, new PartitionFn<TableRow>(){

                                        @Override
                                        public int partitionFor(TableRow arg0, int arg1) {

                                            return 0;
                                        }

                                    }));

Any Other Way through Which I Can Partition My PCollection

我可以分配我的PCollection的任何其他方式

//Without Dynamic destination partitioning BigQuery table

//没有动态目标分区BigQuery表

merge.apply("write into target", BigQueryIO.writeTableRows()
                                         .to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
                                                @Override
                                                public TableDestination apply(ValueInSingleWindow<TableRow> value) {
                                                       TableRow row = value.getValue();
                                                       TableReference reference = new TableReference();
                                                       reference.setProjectId("XYZ");
                                                       reference.setDatasetId("ABC");
                                                       System.out.println("date of row " + row.get("authorized_transaction_date_yyyymmdd").toString());               
                                                       LOG.info("date of row "+
                                                       row.get("authorized_transaction_date_yyyymmdd").toString());
                                                       String str = row.get("authorized_transaction_date_yyyymmdd").toString();
                                                       str = str.substring(0, str.length() - 2) + "01";
                                                       System.out.println("str value " + str);
                                                       LOG.info("str value " + str);
                                                       reference.setTableId("TargetTable$" + str);
                                                       return new TableDestination(reference, null);
                                                }
                                         }).withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
                                                @Override
                                                public TableRow apply(TableRow input) {
                                                       LOG.info("format function:"+input.toString());

                                                       return input;
                                                }
                                         })

                                         .withSchema(schema1).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                                         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Now I have to use Dynamic Destination Any Solution.Insted Of this and have to Do Partition.

现在我必须使用Dynamic Destination Any Solution.Insted Of this并且必须进行分区。

1 个解决方案

#1


1  

Based on seeing TableRow in your code, I suspect that you want to write a PCollection to BigQuery, sending different elements to different BigQuery tables. BigQueryIO.write() already provides a method to do that, using BigQueryIO.write().to(DynamicDestinations). See Writing different values to different BigQuery tables in Apache Beam.

基于在代码中看到TableRow,我怀疑您想要向BigQuery编写PCollection,将不同的元素发送到不同的BigQuery表。 BigQueryIO.write()已经提供了一种方法,使用BigQueryIO.write()。to(DynamicDestinations)。请参阅在Apache Beam中将不同的值写入不同的BigQuery表。

#1


1  

Based on seeing TableRow in your code, I suspect that you want to write a PCollection to BigQuery, sending different elements to different BigQuery tables. BigQueryIO.write() already provides a method to do that, using BigQueryIO.write().to(DynamicDestinations). See Writing different values to different BigQuery tables in Apache Beam.

基于在代码中看到TableRow,我怀疑您想要向BigQuery编写PCollection,将不同的元素发送到不同的BigQuery表。 BigQueryIO.write()已经提供了一种方法,使用BigQueryIO.write()。to(DynamicDestinations)。请参阅在Apache Beam中将不同的值写入不同的BigQuery表。