BigQueryIO - 为每个项目写入两个表

时间:2021-07-05 15:25:45

I'm trying to use Apache Beam to write a job in Dataflow. This job needs gets an input and transforms it into my custom object. This object represents a memory test, which contains fixed properties like timestamp, name ... and a list of the partitions with their properties too

我正在尝试使用Apache Beam在Dataflow中编写作业。此作业需要获取输入并将其转换为我的自定义对象。此对象表示内存测试,其中包含固定属性,如timestamp,name ...以及具有其属性的分区列表

public class TestResult {

    String testName;
    String testId;
    String testStatus;
    String testResult;
    List<Partition> testPartitions;
}
public class Partition {
    String testId;
    String filesystem;
    String mountedOn;
    String usePercentage;
    String available;
    String size;
    String used;
}

My last transform, takes this TestResult Object and transforms it into Table rows.

我的最后一个转换,获取此TestResult对象并将其转换为表行。

static class TestResultToRowConverter extends DoFn<TestResult, TableRow> {
    /**
     * In this example, put the whole string into single BigQuery field.
     */
    @ProcessElement
    public void processElement(ProcessContext c) {
      System.out.println("setting TestResult-> TestResult:" + c.element());
      c.output(new TableRow().set("testName", c.element().testName).set("testId", c.element().testId).set("testStatus", c.element().testStatus).set("testResult", c.element().testResult).set("memoryTestData", "example data test"));
      for (Partition pt :  c.element().testPartitions)
      {
        c.output(partitionsTag, new TableRow().set("testId", pt.testId).set("filesystem", pt.filesystem).set("mountedOn", pt.mountedOn).set("usePercentage", pt.usePercentage).set("available", pt.available).set("size", pt.size).set("used", pt.used));
      }
    }

And now I want to write them to BigQuery, but the table row of the test result goes to one table with an specific schema and the partitions go to another table with another schema. Also mention that there is an Id that links both and I need to autogenerate when the testResult is added and reused when the partition rows are inserted.

现在我想将它们写入BigQuery,但测试结果的表行转到一个具有特定模式的表,并且分区转到另一个具有另一个模式的表。还要提一下,有一个Id链接两者,我需要在添加testResult时自动生成,并在插入分区行时重用。

How can I accomplish this?

我怎么能做到这一点?

I was using this for writing to 1 table, but I'm lost if I want to write to two tables.

我用它来写1个表,但如果我想写两个表,我就迷路了。

    .apply("MemoryTest_WriteToBigQuery", BigQueryIO.writeTableRows().to(TestResultToRowConverter.getTableSpec1())
        .withSchema(TestResultToRowConverter.getMemoryTestSchema())
        .withWriteDisposition(WriteDisposition.WRITE_APPEND))

EDIT:

编辑:

Here is my pipeline:

这是我的管道:

pipeline.apply("ReadFromPubSubToBigQuery_MemoryTest", PubsubIO.readMessagesWithAttributes().fromTopic(options.getPubsubTopic()))
    .apply("MemoryTest_ProcessObject", ParDo.of(new ProcessTestResult()))
    .apply("MemoryTest_IdentifyMemoryTest",ParDo.of(new DetectTestType()))
    .apply("MemoryTest_TransformIntoTableRow", ParDo.of(new TestResultToRowConverter()).withOutputTags(partitionsTag))
    .apply("MemoryTest_WriteToBigQuery", BigQueryIO.writeTableRows().to(TestResultToRowConverter.getTableSpec1())
        .withSchema(TestResultToRowConverter.getMemoryTestSchema())
        .withWriteDisposition(WriteDisposition.WRITE_APPEND))

1 个解决方案

#1


4  

Beam pipelines are not limited to being a single straight line of transforms applied one after another - that would be very restrictive.

光束管道不限于一个接一个地应用的单个直线变换 - 这将是非常严格的。

You can apply as many transforms as you want to any PCollection.

您可以根据需要对任何PCollection应用任意数量的转换。

PCollection<TableRow> rows = ...;
rows.apply(BigQueryIO.writeTableRows().to(first table));
rows.apply(BigQueryIO.writeTableRows().to(second table));
rows.apply(some more processing)
    .apply(BigQueryIO.writeTableRows().to(third table));

#1


4  

Beam pipelines are not limited to being a single straight line of transforms applied one after another - that would be very restrictive.

光束管道不限于一个接一个地应用的单个直线变换 - 这将是非常严格的。

You can apply as many transforms as you want to any PCollection.

您可以根据需要对任何PCollection应用任意数量的转换。

PCollection<TableRow> rows = ...;
rows.apply(BigQueryIO.writeTableRows().to(first table));
rows.apply(BigQueryIO.writeTableRows().to(second table));
rows.apply(some more processing)
    .apply(BigQueryIO.writeTableRows().to(third table));