I have a Java DataFlow pipeline with the following parts:
我有一个Java DataFlow管道,包含以下部分:
- PubSub subscriber reading from several topics
- PubSub订阅者从几个主题中读取
- Flatten.pCollections operation
- Flatten.pCollections操作
- Transform from PubsubMessage to TableRow
- 从PubsubMessage转换为TableRow
- BigQuery writer to write all to a dynamic table
- BigQuery编写器将所有内容写入动态表
When there's more than one PubSub-topic in the list of subscriptions to connect to, all elements get stuck in the GroupByKey operation in the Reshuffle operation within the BigQuery writer. I've let it run for a couple of hours after sending several dozen test messages, but nothing is written to BigQuery.
如果要连接的订阅列表中有多个PubSub主题,则所有元素都会在BigQuery编写器的重新洗牌操作中卡在GroupByKey操作中。在发送了几十条测试消息后,我让它运行了几个小时,但没有任何内容写入BigQuery。
I found the following three work-arounds (each of them works separately from the others)
我找到了以下三种解决方法(每种方法都与其他方法分开工作)
- Add a 'withTimestampAttribute' call on the Pubsub subscriptions. The name of the attribute does not matter at all - it can be any existing or non-existing attribute on the incoming messages
- 在Pubsub订阅上添加'withTimestampAttribute'调用。属性的名称根本不重要 - 它可以是传入消息上的任何现有或不存在的属性
- Reduce the number of PubSub subscriptions to just 1
- 将PubSub订阅的数量减少到1
- Remove the Flatten.pCollections operation in between, creating multiple separate pipelines doing the exact same thing
- 删除之间的Flatten.pCollections操作,创建多个单独的管道完成相同的操作
The messages are not intentionally timestamped - writing them to BigQuery using just the PubsubMessage timestamp is completely acceptable.
消息不是有意加时间戳的 - 只使用PubsubMessage时间戳将它们写入BigQuery是完全可以接受的。
It also confuses me that even adding a non-existing timestamp attribute seems to fix the issue. I debugged the issue to print out the timestamps within the pipeline, and they are comparable in both cases; when specifying a non-existing timestamp attribute, it seems to fall back to the pubsub timestamp anyway.
它也让我感到困惑的是,即使添加一个不存在的时间戳属性似乎也解决了这个问题。我调试了问题以打印出管道中的时间戳,并且它们在两种情况下都是可比较的;在指定不存在的时间戳属性时,无论如何它似乎都会回退到pubsub时间戳。
What could be causing this issue? How can I resolve it? For me, the most acceptable work-around is removing the Flatten.pCollections operation as it doesn't strictly complicate the code, but I can't get my head around the reason why it fails.
可能导致此问题的原因是什么?我该如何解决?对我来说,最可接受的解决方法是删除Flatten.pCollections操作,因为它并没有严格地使代码复杂化,但我无法理解它失败的原因。
1 个解决方案
#1
2
Did you apply windowing to your pipeline? The Beam documentation warns you about using a unbounded PCollection (like Pub/Sub) without any windowing or triggering:
您是否将窗口应用于管道? Beam文档警告您使用*PCollection(如Pub / Sub)而不进行任何窗口或触发:
If you don’t set a non-global windowing function or a non-default trigger for your unbounded PCollection and subsequently use a grouping transform such as GroupByKey or Combine, your pipeline will generate an error upon construction and your job will fail.
如果没有为*PCollection设置非全局窗口函数或非默认触发器,并随后使用GroupByKey或Combine等分组转换,则管道将在构造时生成错误,您的作业将失败。
In your case the pipeline does not fail on construction, but the messages are stuck in the GroupByKey, because it is waiting for the window to end. Try adding a window before the BigQuery writer and see if that solves the problem.
在您的情况下,管道在构造时不会失败,但消息会卡在GroupByKey中,因为它正在等待窗口结束。尝试在BigQuery编写器之前添加一个窗口,看看是否能解决问题。
#1
2
Did you apply windowing to your pipeline? The Beam documentation warns you about using a unbounded PCollection (like Pub/Sub) without any windowing or triggering:
您是否将窗口应用于管道? Beam文档警告您使用*PCollection(如Pub / Sub)而不进行任何窗口或触发:
If you don’t set a non-global windowing function or a non-default trigger for your unbounded PCollection and subsequently use a grouping transform such as GroupByKey or Combine, your pipeline will generate an error upon construction and your job will fail.
如果没有为*PCollection设置非全局窗口函数或非默认触发器,并随后使用GroupByKey或Combine等分组转换,则管道将在构造时生成错误,您的作业将失败。
In your case the pipeline does not fail on construction, but the messages are stuck in the GroupByKey, because it is waiting for the window to end. Try adding a window before the BigQuery writer and see if that solves the problem.
在您的情况下,管道在构造时不会失败,但消息会卡在GroupByKey中,因为它正在等待窗口结束。尝试在BigQuery编写器之前添加一个窗口,看看是否能解决问题。