I have a Dataflow pipeline, running locally. The objective is to read a JSON file using TEXTIO, make sessions and load it into BigQuery. Given the structure I have to create a temp directory in GCS and then load it into BigQuery using that. Previously I had a data schema error that prevented me to load the data, see here. That issue is resolved.
我有一个本地运行的Dataflow管道。目标是使用TEXTIO读取JSON文件,创建会话并将其加载到BigQuery中。鉴于结构,我必须在GCS中创建一个临时目录,然后使用它将其加载到BigQuery中。以前我有一个数据模式错误阻止我加载数据,请参阅此处。那个问题已经解决了。
So now when I run the pipeline locally it ends with dumping a temporary JSON newline delimited file into GCS. The SDK then gives me the following:
所以现在当我在本地运行管道时,它以将临时JSON换行符分隔文件转储到GCS结束。 SDK然后给我以下内容:
Starting BigQuery load job beam_job_xxxx_00001-1: try 1/3
INFO [main] (BigQueryIO.java:2191) - BigQuery load job failed: beam_job_xxxx_00001-1
...
Exception in thread "main" com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create the load job beam_job_xxxx_00001, reached max retries: 3
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:187)
at pedesys.Dataflow.main(Dataflow.java:148)
Caused by: java.lang.RuntimeException: Failed to create the load job beam_job_xxxx_00001, reached max retries: 3
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$WriteTables.load(BigQueryIO.java:2198)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$WriteTables.processElement(BigQueryIO.java:2146)
The errors are not very descriptive and the data is still not loaded in BigQuery. What is puzzling is that if I go to the BigQuery UI and load the same temporary file from GCS that was dumped by the SDK's Dataflow pipeline manually, in the same table, it works beautifully.
错误不是非常具有描述性,并且数据仍未在BigQuery中加载。令人费解的是,如果我转到BigQuery UI并从GCS手动加载由SDK的Dataflow管道转储的相同临时文件,在同一个表中,它可以很好地工作。
The relevant code parts are as follows:
相关代码部分如下:
PipelineOptions options = PipelineOptionsFactory.create();
options.as(BigQueryOptions.class)
.setTempLocation("gs://test/temp");
Pipeline p = Pipeline.create(options)
...
...
session_windowed_items.apply(ParDo.of(new FormatAsTableRowFn()))
.apply(BigQueryIO.Write
.named("loadJob")
.to("myproject:db.table")
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
);
1 个解决方案
#1
3
The SDK is swallowing the error/exception and not reporting it to users. It's most likely a schema problem. To get the actual error that is happening you need to fetch the job details by either:
SDK正在吞噬错误/异常,而不是向用户报告。这很可能是架构问题。要获得正在发生的实际错误,您需要通过以下任一方式获取作业详细信息:
-
CLI -
bq show -j job beam_job_<xxxx>_00001-1
-
CLI - bq show -j job beam_job_
_00001-1 - Browser/Web: use "try it" at the bottom of the page here.
- 浏览器/网站:在此处使用页面底部的“试用”。
@jkff has raised an issue here to improve the error reporting.
@jkff在这里提出了一个问题,以改进错误报告。
#1
3
The SDK is swallowing the error/exception and not reporting it to users. It's most likely a schema problem. To get the actual error that is happening you need to fetch the job details by either:
SDK正在吞噬错误/异常,而不是向用户报告。这很可能是架构问题。要获得正在发生的实际错误,您需要通过以下任一方式获取作业详细信息:
-
CLI -
bq show -j job beam_job_<xxxx>_00001-1
-
CLI - bq show -j job beam_job_
_00001-1 - Browser/Web: use "try it" at the bottom of the page here.
- 浏览器/网站:在此处使用页面底部的“试用”。
@jkff has raised an issue here to improve the error reporting.
@jkff在这里提出了一个问题,以改进错误报告。