在Apache Beam中为不同的BigQuery表写入不同的值

时间:2022-05-10 01:15:09

Suppose I have a PCollection<Foo> and I want to write it to multiple BigQuery tables, choosing a potentially different table for each Foo.

假设我有一个PCollection ,我想将它写入多个BigQuery表,为每个Foo选择一个可能不同的表。

How can I do this using the Apache Beam BigQueryIO API?

如何使用Apache Beam BigQueryIO API执行此操作?

1 个解决方案

#1


20  

This is possible using a feature recently added to BigQueryIO in Apache Beam.

这可以使用最近添加到Apache Beam中的BigQueryIO的功能。

PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
  @Override
  public TableDestination apply(ValueInSingleWindow<Foo> value) {  
    Foo foo = value.getValue();
    // Also available: value.getWindow(), getTimestamp(), getPane()
    String tableSpec = ...;
    String tableDescription = ...;
    return new TableDestination(tableSpec, tableDescription);
  }
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
  @Override
  public TableRow apply(Foo foo) {
    return ...;
  }
}).withSchema(...));

Depending on whether the input PCollection<Foo> is bounded or unbounded, under the hood this will either create multiple BigQuery import jobs (one or more per table depending on amount of data), or it will use the BigQuery streaming inserts API.

根据输入的PCollection 是有界还是*,这将导致创建多个BigQuery导入作业(每个表一个或多个,具体取决于数据量),或者它将使用BigQuery流插入API。

The most flexible version of the API uses DynamicDestinations, which allows you to write different values to different tables with different schemas, and even allows you to use side inputs from the rest of the pipeline in all of these computations.

最灵活的API版本使用DynamicDestinations,它允许您使用不同的模式将不同的值写入不同的表,甚至允许您在所有这些计算中使用来自管道其余部分的侧输入。

Additionally, BigQueryIO has been refactored into a number of reusable transforms that you can yourself combine to implement more complex use cases - see files in the source directory.

此外,BigQueryIO已被重构为许多可重用的转换,您可以自己组合这些转换以实现更复杂的用例 - 请参阅源目录中的文件。

This feature will be included in the first stable release of Apache Beam and into the next release of Dataflow SDK (which will be based on the first stable release of Apache Beam). Right now you can use this by running your pipeline against a snapshot of Beam at HEAD from github.

此功能将包含在Apache Beam的第一个稳定版本中,并将包含在下一版本的Dataflow SDK中(将基于Apache Beam的第一个稳定版本)。现在,您可以通过在github上针对HEAD的Beam快照运行管道来使用它。

#1


20  

This is possible using a feature recently added to BigQueryIO in Apache Beam.

这可以使用最近添加到Apache Beam中的BigQueryIO的功能。

PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
  @Override
  public TableDestination apply(ValueInSingleWindow<Foo> value) {  
    Foo foo = value.getValue();
    // Also available: value.getWindow(), getTimestamp(), getPane()
    String tableSpec = ...;
    String tableDescription = ...;
    return new TableDestination(tableSpec, tableDescription);
  }
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
  @Override
  public TableRow apply(Foo foo) {
    return ...;
  }
}).withSchema(...));

Depending on whether the input PCollection<Foo> is bounded or unbounded, under the hood this will either create multiple BigQuery import jobs (one or more per table depending on amount of data), or it will use the BigQuery streaming inserts API.

根据输入的PCollection 是有界还是*,这将导致创建多个BigQuery导入作业(每个表一个或多个,具体取决于数据量),或者它将使用BigQuery流插入API。

The most flexible version of the API uses DynamicDestinations, which allows you to write different values to different tables with different schemas, and even allows you to use side inputs from the rest of the pipeline in all of these computations.

最灵活的API版本使用DynamicDestinations,它允许您使用不同的模式将不同的值写入不同的表,甚至允许您在所有这些计算中使用来自管道其余部分的侧输入。

Additionally, BigQueryIO has been refactored into a number of reusable transforms that you can yourself combine to implement more complex use cases - see files in the source directory.

此外,BigQueryIO已被重构为许多可重用的转换,您可以自己组合这些转换以实现更复杂的用例 - 请参阅源目录中的文件。

This feature will be included in the first stable release of Apache Beam and into the next release of Dataflow SDK (which will be based on the first stable release of Apache Beam). Right now you can use this by running your pipeline against a snapshot of Beam at HEAD from github.

此功能将包含在Apache Beam的第一个稳定版本中,并将包含在下一版本的Dataflow SDK中(将基于Apache Beam的第一个稳定版本)。现在,您可以通过在github上针对HEAD的Beam快照运行管道来使用它。