在Apache Beam中使用BigQuery处理空PCollections

时间:2022-03-15 01:10:19

Using the following code, I am getting the following errors when trying to write to BigQuery

使用以下代码,我在尝试写入BigQuery时遇到以下错误

I am using Apache-Beam 2.0.0

我正在使用Apache-Beam 2.0.0

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException

线程“main”中的异常org.apache.beam.sdk.Pipeline $ PipelineExecutionException:java.lang.NullPointerException

If I change the text.startsWith to D, everything works fine (i.e. so something is output).

如果我将text.startsWith更改为D,一切正常(即输出一些东西)。

Is there someway to catch or watch for empty PCollections?

有什么东西可以捕捉或观察空的PCollections吗?

Based on the StackTrace it looks like the error is actually in BigQueryIO - the file left in my bucket has 0 bytes and maybe this is causing BigQueryIO a problem.

基于StackTrace,看起来错误实际上是在BigQueryIO中 - 我的存储桶中剩下的文件有0个字节,这可能导致BigQueryIO出现问题。

My use case is that I am using side outputs for DeadLetters and encountered this error when my job produced no dead-letter output, so robustly handling this would be useful.

我的用例是我使用DeadLetters的侧输出,并且当我的作业没有产生死信输出时遇到此错误,因此强有力地处理这将是有用的。

The job should really be able to run in batch or streaming mode, my best guess is to write any output to GCS / TextIO in batch mode and GBQ when streaming, if that sounds sensible?

该作业应该能够以批处理或流模式运行,我最好的猜测是在批处理模式下将任何输出写入GCS / TextIO,在流式处理时将GBQ写入GBQ,如果这听起来合理吗?

Any help gratefully received.

任何帮助感激不尽。

public class EmptyPCollection {

public static void main(String [] args) {

    PipelineOptions options = PipelineOptionsFactory.create();
    options.setTempLocation("gs://<your-bucket-here>/temp");
    Pipeline pipeline = Pipeline.create(options);
    String schema = "{\"fields\": [{\"name\": \"pet\", \"type\": \"string\", \"mode\": \"required\"}]}";
    String table = "<your-dataset>.<your-table>";
    List<String> pets = Arrays.asList("Dog", "Cat", "Goldfish");
    PCollection<String> inputText = pipeline.apply(Create.of(pets)).setCoder(StringUtf8Coder.of());
    PCollection<TableRow> rows = inputText.apply(ParDo.of(new DoFn<String, TableRow>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String text = c.element();
            if (text.startsWith("X")) {  // change to (D)og and works fine
                TableRow row = new TableRow();
                row.set("pet", text);
                c.output(row);
            }
        }
    }));

    rows.apply(BigQueryIO.writeTableRows().to(table).withJsonSchema(schema)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

    pipeline.run().waitUntilFinish();

}

}

}

[direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://<your-bucket>/temp/BigQueryWriteTemp/05c7a7c0786a4656abad97f11ef23d8e/2675e1c7-f4d7-4f78-a85f-a38095b57e6b.

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
at EmptyPCollection.main(EmptyPCollection.java:54)
Caused by: java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.processElement(WriteTables.java:97)

1 个解决方案

#1


3  

This looks like a bug in the BigQuery sink implementation within Apache Beam. Filing a bug in the Apache Beam Jira would be the appropriate place to file this.

这看起来像是Apache Beam中BigQuery接收器实现中的一个错误。提交Apache Beam Jira中的错误将是提交此文件的适当位置。

I have filed https://issues.apache.org/jira/browse/BEAM-2406 to track this issue.

我已提交https://issues.apache.org/jira/browse/BEAM-2406来跟踪此问题。

#1


3  

This looks like a bug in the BigQuery sink implementation within Apache Beam. Filing a bug in the Apache Beam Jira would be the appropriate place to file this.

这看起来像是Apache Beam中BigQuery接收器实现中的一个错误。提交Apache Beam Jira中的错误将是提交此文件的适当位置。

I have filed https://issues.apache.org/jira/browse/BEAM-2406 to track this issue.

我已提交https://issues.apache.org/jira/browse/BEAM-2406来跟踪此问题。