BigQuery手动加载,但不通过Java SDK加载

时间:2021-12-29 14:25:49

I have a Dataflow pipeline, running locally. The objective is to read a JSON file using TEXTIO, make sessions and load it into BigQuery. Given the structure I have to create a temp directory in GCS and then load it into BigQuery using that. Previously I had a data schema error that prevented me to load the data, see here. That issue is resolved.

我有一个本地运行的Dataflow管道。目标是使用TEXTIO读取​​JSON文件,创建会话并将其加载到BigQuery中。鉴于结构,我必须在GCS中创建一个临时目录,然后使用它将其加载到BigQuery中。以前我有一个数据模式错误阻止我加载数据,请参阅此处。那个问题已经解决了。

So now when I run the pipeline locally it ends with dumping a temporary JSON newline delimited file into GCS. The SDK then gives me the following:

所以现在当我在本地运行管道时,它以将临时JSON换行符分隔文件转储到GCS结束。 SDK然后给我以下内容:

Starting BigQuery load job beam_job_xxxx_00001-1: try 1/3
INFO [main] (BigQueryIO.java:2191) - BigQuery load job failed: beam_job_xxxx_00001-1
...
Exception in thread "main" com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create the load job beam_job_xxxx_00001, reached max retries: 3
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:187)
at pedesys.Dataflow.main(Dataflow.java:148)
Caused by: java.lang.RuntimeException: Failed to create the load job beam_job_xxxx_00001, reached max retries: 3
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$WriteTables.load(BigQueryIO.java:2198)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$WriteTables.processElement(BigQueryIO.java:2146)

The errors are not very descriptive and the data is still not loaded in BigQuery. What is puzzling is that if I go to the BigQuery UI and load the same temporary file from GCS that was dumped by the SDK's Dataflow pipeline manually, in the same table, it works beautifully.

错误不是非常具有描述性,并且数据仍未在BigQuery中加载。令人费解的是,如果我转到BigQuery UI并从GCS手动加载由SDK的Dataflow管道转储的相同临时文件,在同一个表中,它可以很好地工作。

The relevant code parts are as follows:

相关代码部分如下:

PipelineOptions options = PipelineOptionsFactory.create();
    options.as(BigQueryOptions.class)
            .setTempLocation("gs://test/temp");
    Pipeline p = Pipeline.create(options)
...

...
session_windowed_items.apply(ParDo.of(new FormatAsTableRowFn()))
      .apply(BigQueryIO.Write
      .named("loadJob")
      .to("myproject:db.table")
      .withSchema(schema)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
      );

1 个解决方案

#1


3  

The SDK is swallowing the error/exception and not reporting it to users. It's most likely a schema problem. To get the actual error that is happening you need to fetch the job details by either:

SDK正在吞噬错误/异常,而不是向用户报告。这很可能是架构问题。要获得正在发生的实际错误,您需要通过以下任一方式获取作业详细信息:

  1. CLI - bq show -j job beam_job_<xxxx>_00001-1
  2. CLI - bq show -j job beam_job_ _00001-1
  3. Browser/Web: use "try it" at the bottom of the page here.
  4. 浏览器/网站:在此处使用页面底部的“试用”。

@jkff has raised an issue here to improve the error reporting.

@jkff在这里提出了一个问题,以改进错误报告。

#1


3  

The SDK is swallowing the error/exception and not reporting it to users. It's most likely a schema problem. To get the actual error that is happening you need to fetch the job details by either:

SDK正在吞噬错误/异常,而不是向用户报告。这很可能是架构问题。要获得正在发生的实际错误,您需要通过以下任一方式获取作业详细信息:

  1. CLI - bq show -j job beam_job_<xxxx>_00001-1
  2. CLI - bq show -j job beam_job_ _00001-1
  3. Browser/Web: use "try it" at the bottom of the page here.
  4. 浏览器/网站:在此处使用页面底部的“试用”。

@jkff has raised an issue here to improve the error reporting.

@jkff在这里提出了一个问题,以改进错误报告。