数据流批处理作业失败,“无法关闭某些编写器”

时间:2022-08-01 15:40:25

I am running a batch pipeline with the Apache Beam 2.2 SDK via the Cloud Dataflow service. There are 751 text files that I parse using TextIO.readAll() transform, deserialize and write to a date partitioned table in BigQuery.

我正在通过Cloud Dataflow服务运行Apache Beam 2.2 SDK的批处理管道。我使用TextIO.readAll()转换解析了751个文本文件,反序列化并写入BigQuery中的日期分区表。

First thing I noticed is that autoscaling was not really kicking in and left the pipeline at 15 workers, even though I was able to push throughput a lot higher when for example manually setting the number of workers to 250.

我注意到的第一件事是自动缩放并没有真正开始并且在15名工作人员处离开管道,即使我能够将吞吐量提高很多,例如手动将工人数量设置为250。

My pipeline fails with the following stack trace:

我的管道失败,出现以下堆栈跟踪:

(abed94a6f5139e21): java.io.IOException: Failed to close some writers
at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.finishBundle(WriteBundlesToFiles.java:248)
Suppressed: java.io.IOException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 503 Service Unavailable
Service Unavailable
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:431)
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:289)
    at org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.close(TableRowWriter.java:81)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.finishBundle(WriteBundlesToFiles.java:242)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeFinishBundle(Unknown Source)
    at org.apache.beam.runners.core.SimpleDoFnRunner.finishBundle(SimpleDoFnRunner.java:187)
    at com.google.cloud.dataflow.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:407)
    at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:60)
    at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
    at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:330)
    at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:302)
    at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:251)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 503 Service Unavailable
Service Unavailable
    at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
    ... 4 more

Should I try with even more workers or split the work across several pipelines?

我应该尝试更多的工作人员还是将工作分成几个管道?

1 个解决方案

#1


1  

Thanks to the comment by jkff it worked flawlessly - after setting --maxNumWorkers=250 (15 seems to be the standard maximum). The error was a transient error that Dataflow would retry several times and in the end, the pipeline ran successfully.

感谢jkff的评论,它完美无缺 - 设置后--maxNumWorkers = 250(15似乎是标准最大值)。该错误是一个瞬态错误,Dataflow会重试几次,最后,管道成功运行。

#1


1  

Thanks to the comment by jkff it worked flawlessly - after setting --maxNumWorkers=250 (15 seems to be the standard maximum). The error was a transient error that Dataflow would retry several times and in the end, the pipeline ran successfully.

感谢jkff的评论,它完美无缺 - 设置后--maxNumWorkers = 250(15似乎是标准最大值)。该错误是一个瞬态错误,Dataflow会重试几次,最后,管道成功运行。