BigQueryIO - 使用流和FILE_LOADS写性能

时间:2021-09-08 14:04:25

My pipeline : Kafka -> Dataflow streaming (Beam v2.3) -> BigQuery

我的管道:Kafka - > Dataflow streaming(Beam v2.3) - > BigQuery

Given that low-latency isn't important in my case, I use FILE_LOADS to reduce the costs, like this :

鉴于在我的情况下低延迟并不重要,我使用FILE_LOADS来降低成本,如下所示:

BigQueryIO.writeTableRows()
  .withJsonSchema(schema)
  .withWriteDisposition(WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
  .withMethod(Method.FILE_LOADS)
  .withTriggeringFrequency(triggeringFrequency)      
  .withCustomGcsTempLocation(gcsTempLocation)
  .withNumFileShards(numFileShards) 
  .withoutValidation()
  .to(new SerializableFunction[ValueInSingleWindow[TableRow], TableDestination]() {
    def apply(element: ValueInSingleWindow[TableRow]): TableDestination = {
      ...
    }
  }

This Dataflow step is introducing an always bigger delay in the pipeline, so that it can't keep up with Kafka throughput (less than 50k events/s), even with 40 n1-standard-s4 workers. As shown on the screenshot below, the system lag is very big (close to pipeline up-time) for this step, whereas Kafka system lag is only a few seconds.

这个Dataflow步骤在管道中引入了一个总是更大的延迟,因此即使有40个n1-standard-s4工作人员也无法跟上Kafka吞吐量(小于50k事件/秒)。如下面的屏幕截图所示,此步骤的系统滞后非常大(接近管道正常运行时间),而Kafka系统滞后只有几秒钟。

BigQueryIO  - 使用流和FILE_LOADS写性能

If I understand correctly, Dataflow writes the elements into numFileShards in gcsTempLocation and every triggeringFrequency a load job is started to insert them into BigQuery. For instance if I choose a triggeringFrequency of 5 minutes, I can see (with bq ls -a -j) that all the load jobs need less than 1 minute to be completed. But still the step is introducing more and more delay, resulting in Kafka consuming less and less elements (thanks to bcackpressure). Increasing/decreasing numFileShards and triggeringFrequency doesn't correct the problem.

如果我理解正确,Dataflow会将元素写入gcsTempLocation中的numFileShards,并且每个triggeringFrequency都会启动一个加载作业,将它们插入到BigQuery中。例如,如果我选择5分钟的触发频率,我可以看到(使用bq ls -a -j)所有装载作业需要不到1分钟才能完成。但仍然是这一步骤引入了越来越多的延迟,导致Kafka消耗越来越少的元素(由于bcackpressure)。增加/减少numFileShards和triggeringFrequency并不能解决问题。

I don't manually specify any window, I just the default one. Files are not accumulating in gcsTempLocation.

我没有手动指定任何窗口,我只是默认窗口。文件不会在gcsTempLocation中累积。

Any idea what's going wrong here?

知道这里出了什么问题吗?

1 个解决方案

#1


1  

You mention that you don't explicitly specify a Window, which means that by default Dataflow will use the "Global window". The windowing documentation contains this warning:

您提到您没有明确指定Window,这意味着默认情况下Dataflow将使用“全局窗口”。窗口文档包含此警告:

Caution: Dataflow's default windowing behavior is to assign all elements of a PCollection to a single, global window, even for unbounded PCollections. Before you use a grouping transform such as GroupByKey on an unbounded PCollection, you must set a non-global windowing function. See Setting Your PCollection's Windowing Function.

注意:Dataflow的默认窗口行为是将PCollection的所有元素分配给单个全局窗口,即使对于*PCollections也是如此。在*PCollection上使用GroupByKey之类的分组转换之前,必须设置非全局窗口函数。请参阅设置PCollection的窗口函数。

If you don't set a non-global windowing function for your unbounded PCollection and subsequently use a grouping transform such as GroupByKey or Combine, your pipeline will generate an error upon construction and your Dataflow job will fail.

如果没有为*PCollection设置非全局窗口函数,并随后使用分组转换(如GroupByKey或Combine),则管道将在构造时生成错误,并且您的数据流作业将失败。

You can alternatively set a non-default Trigger for a PCollection to allow the global window to emit "early" results under some other conditions.

您也可以为PCollection设置非默认触发器,以允许全局窗口在某些其他条件下发出“早期”结果。

It appears that your pipeline doesn't do any explicit grouping, but I wonder if internal grouping via BigQuery write is causing issues.

您的管道似乎没有进行任何显式分组,但我想知道通过BigQuery写入内部分组是否会导致问题。

Can you see in the UI if your downstream DropInputs has received any elements? If not, this is an indication that data is getting held up in the upstream BigQuery step.

您是否可以在UI中看到下游DropInputs是否收到了任何元素?如果没有,这表明数据在上游BigQuery步骤中被阻止。

#1


1  

You mention that you don't explicitly specify a Window, which means that by default Dataflow will use the "Global window". The windowing documentation contains this warning:

您提到您没有明确指定Window,这意味着默认情况下Dataflow将使用“全局窗口”。窗口文档包含此警告:

Caution: Dataflow's default windowing behavior is to assign all elements of a PCollection to a single, global window, even for unbounded PCollections. Before you use a grouping transform such as GroupByKey on an unbounded PCollection, you must set a non-global windowing function. See Setting Your PCollection's Windowing Function.

注意:Dataflow的默认窗口行为是将PCollection的所有元素分配给单个全局窗口,即使对于*PCollections也是如此。在*PCollection上使用GroupByKey之类的分组转换之前,必须设置非全局窗口函数。请参阅设置PCollection的窗口函数。

If you don't set a non-global windowing function for your unbounded PCollection and subsequently use a grouping transform such as GroupByKey or Combine, your pipeline will generate an error upon construction and your Dataflow job will fail.

如果没有为*PCollection设置非全局窗口函数,并随后使用分组转换(如GroupByKey或Combine),则管道将在构造时生成错误,并且您的数据流作业将失败。

You can alternatively set a non-default Trigger for a PCollection to allow the global window to emit "early" results under some other conditions.

您也可以为PCollection设置非默认触发器,以允许全局窗口在某些其他条件下发出“早期”结果。

It appears that your pipeline doesn't do any explicit grouping, but I wonder if internal grouping via BigQuery write is causing issues.

您的管道似乎没有进行任何显式分组,但我想知道通过BigQuery写入内部分组是否会导致问题。

Can you see in the UI if your downstream DropInputs has received any elements? If not, this is an indication that data is getting held up in the upstream BigQuery step.

您是否可以在UI中看到下游DropInputs是否收到了任何元素?如果没有,这表明数据在上游BigQuery步骤中被阻止。