I was planning on using Google Dataflow to coordinate human-in-the-loop form completion, checking for conflict after 3 forms have been completed. I have setup Google PubSub for both Dataflow source and sink and want to simply have the trigger fire and send to the PubSub sink after three forms have been received for a given JobId
.
我计划使用Google Dataflow来协调人工循环表单的完成,在完成3个表单后检查冲突。我已经为数据流源和接收器设置了Google PubSub,并希望在收到给定JobId的三个表单后,只需触发触发器并发送到PubSub接收器。
This SO post looked similar to the problem I was trying to solve, however when I implement it, the trigger is firing and sending output to the PubSub sink before the AfterPane.elementCountAtLeast is reached.
这个SO帖子看起来类似于我试图解决的问题,但是当我实现它时,触发器正在触发并在到达AfterPane.elementCountAtLeast之前将输出发送到PubSub接收器。
I have tried it with the GlobalWindow
and SlidingWindow
s. Once I get the trigger to fire after the elementCountAtLeast
is reached, I was planning on implementing a GroupByKey
for the jobId
. However, before I moved to that step I'd like to get the elementCountAtLeast
working in isolation.
我已经尝试过使用GlobalWindow和SlidingWindows。一旦我在达到elementCountAtLeast后触发了触发器,我就计划为jobId实现GroupByKey。但是,在我转移到该步骤之前,我想让elementCountAtLeast独立工作。
Here is the code for reading from PubSub and the SlidingWindow
:
这是从PubSub和SlidingWindow读取的代码:
PCollection<String> humanInTheLoopInput;
humanInTheLoopInput = pipeline
.apply(PubsubIO.Read
.named("ReadFromHumanInTheLoopSubscription")
.subscription(options.getInputHumanInTheLoopRawSubscription()));
PCollection<String> windowedInput = humanInTheLoopInput
.apply(Window
.<String>into(SlidingWindows
.of(Duration.standardSeconds(30))
.every(Duration.standardSeconds(5)))
.<String>triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(3)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(10)));
1 个解决方案
#1
2
Without a GroupByKey
nothing is being triggered. Both windowing and triggering only affect grouping (and combining) operations.
如果没有GroupByKey,则不会触发任何内容。窗口化和触发仅影响分组(和组合)操作。
#1
2
Without a GroupByKey
nothing is being triggered. Both windowing and triggering only affect grouping (and combining) operations.
如果没有GroupByKey,则不会触发任何内容。窗口化和触发仅影响分组(和组合)操作。