
时间:2021-09-08 14:04:31

We have a pipeline reading data from BigQuery and processing historical data for various calendar years. It fails with OutOfMemoryError errors if the input data is small (~500MB)


On startup it reads from BigQuery about 10.000 elements/sec, after short time it slows down to hundreds elements/s then it hangs completely.


Observing 'Elements Added' on the next processing step (BQImportAndCompute), the value increases and then decreases again. That looks to me like some already loaded data is dropped and then loaded again.


Stackdriver Logging console contains errors with various stack traces that contain java.lang.OutOfMemoryError, for example:


Error reporting workitem progress update to Dataflow service:


"java.lang.OutOfMemoryError: Java heap space
    at com.google.cloud.dataflow.sdk.runners.worker.BigQueryAvroReader$BigQueryAvroFileIterator.getProgress(BigQueryAvroReader.java:145)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation$SynchronizedReaderIterator.setProgressFromIteratorConcurrent(ReadOperation.java:397)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation$SynchronizedReaderIterator.setProgressFromIterator(ReadOperation.java:389)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation$1.run(ReadOperation.java:206)

I would suspect that there is a problem with topology of the pipe, but running the same pipeline


  1. locally with DirectPipelineRunner works fine
  2. 本地使用DirectPipelineRunner工作正常
  3. in cloud with DataflowPipelineRunner on large dataset (5GB, for another year) works fine
  4. 云中使用大数据集上的DataflowPipelineRunner(5GB,另一年)工作正常

I assume problem is how Dataflow parallelizes and distributes work in the pipeline. Are there any possibilities to inspect or influence it?


2 个解决方案



The problem here doesn't seem to be related to the size of the BigQuery table, but likely the number of BigQuery sources being used and the rest of the pipeline.


  1. Instead of reading from multiple BigQuery sources and flattening them have you tried reading from a query that pulls in all the information? Doing that in a single step should simplify the pipeline and also allow BigQuery to execute better (one query against multiple tables vs. multiple queries against individual tables).


  2. Another possible problem is if there is a high degree of fan-out within or after the BQImportAndCompute operation. Depending on the computation being done there, you may be able to reduce the fan-out using clever CombineFns or WindowFns. If you want help figuring out how to improve that path, please share more details about what is happening after the BQImportAndCompute.




Have you tried debugging with Stackdriver?






The problem here doesn't seem to be related to the size of the BigQuery table, but likely the number of BigQuery sources being used and the rest of the pipeline.


  1. Instead of reading from multiple BigQuery sources and flattening them have you tried reading from a query that pulls in all the information? Doing that in a single step should simplify the pipeline and also allow BigQuery to execute better (one query against multiple tables vs. multiple queries against individual tables).


  2. Another possible problem is if there is a high degree of fan-out within or after the BQImportAndCompute operation. Depending on the computation being done there, you may be able to reduce the fan-out using clever CombineFns or WindowFns. If you want help figuring out how to improve that path, please share more details about what is happening after the BQImportAndCompute.




Have you tried debugging with Stackdriver?


