从数据流管道写入BQ时的动态表名称

时间:2022-07-13 15:21:36

As a followup question to the following question and answer:

作为以下问题和答案的后续问题:

https://*.com/questions/31156774/about-key-grouping-with-groupbykey

https://*.com/questions/31156774/about-key-grouping-with-groupbykey

I'd like to confirm with google dataflow engineering team (@jkff) if the 3rd option proposed by Eugene is at all possible with google dataflow:

我想与谷歌数据流工程团队(@jkff)确认,如果Eugene提出的第三个选项完全可以使用谷歌数据流:

"have a ParDo that takes these keys and creates the BigQuery tables, and another ParDo that takes the data and streams writes to the tables"

“有一个ParDo,它接受这些键并创建BigQuery表,另一个ParDo将数据和流写入表中”

My understanding is that ParDo/DoFn will process each element, how could we specify a table name (function of the keys passed in from side inputs) when writing out from processElement of a ParDo/DoFn?

我的理解是ParDo / DoFn将处理每个元素,当从ParDo / DoFn的processElement写出时,我们如何指定表名(从侧输入传入的键的功能)?

Thanks.

谢谢。

Updated with a DoFn, which is not working obviously since c.element().value is not a pcollection.

更新了DoFn,由于c.element()。值不是pcollection,因此显然不起作用。

PCollection<KV<String, Iterable<String>>> output = ...;

public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {

private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
        this.keysAsSideinputs = keysAsSideinputs;
    }

@Override
    public void processElement(ProcessContext c) {
        List<String> keys = c.sideInput(keysAsSideinputs);
        String key = c.element().getKey();

        //the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
        c.element().getValue().apply(Pardo.of(new FormatLineFn()))
                .apply(TextIO.Write.to(key));

        c.output(1);
    }    
}    

1 个解决方案

#1


2  

The BigQueryIO.Write transform does not support this. The closest thing you can do is to use per-window tables, and encode whatever information you need to select the table in the window objects by using a custom WindowFn.

BigQueryIO.Write转换不支持此功能。您可以做的最接近的事情是使用每个窗口表,并使用自定义WindowFn编码在窗口对象中选择表所需的任何信息。

If you don't want to do that, you can make BigQuery API calls directly from your DoFn. With this, you can set the table name to anything you want, as computed by your code. This could be looked up from a side input, or computed directly from the element the DoFn is currently processing. To avoid making too many small calls to BigQuery, you can batch up the requests using finishBundle();

如果您不想这样做,可以直接从DoFn进行BigQuery API调用。有了这个,您可以将表名设置为您想要的任何内容,由代码计算。这可以从侧输入查找,或直接从DoFn当前正在处理的元素计算。为了避免对BigQuery进行太多小调用,您可以使用finishBundle()批量处理请求;

You can see how the Dataflow runner does the streaming import here: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java

您可以在此处查看Dataflow运行器如何进行流导入:https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter的.java

#1


2  

The BigQueryIO.Write transform does not support this. The closest thing you can do is to use per-window tables, and encode whatever information you need to select the table in the window objects by using a custom WindowFn.

BigQueryIO.Write转换不支持此功能。您可以做的最接近的事情是使用每个窗口表,并使用自定义WindowFn编码在窗口对象中选择表所需的任何信息。

If you don't want to do that, you can make BigQuery API calls directly from your DoFn. With this, you can set the table name to anything you want, as computed by your code. This could be looked up from a side input, or computed directly from the element the DoFn is currently processing. To avoid making too many small calls to BigQuery, you can batch up the requests using finishBundle();

如果您不想这样做,可以直接从DoFn进行BigQuery API调用。有了这个,您可以将表名设置为您想要的任何内容,由代码计算。这可以从侧输入查找,或直接从DoFn当前正在处理的元素计算。为了避免对BigQuery进行太多小调用,您可以使用finishBundle()批量处理请求;

You can see how the Dataflow runner does the streaming import here: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java

您可以在此处查看Dataflow运行器如何进行流导入:https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter的.java