云数据流:一旦触发不起作用

时间:2022-02-01 15:23:19

I have a Dataflow pipeline reading from unbounded source. My window size is 10 hours, I am trying to test my trigger using a TestStream. My trigger will emit early result if element count reaches at least 2 for the same key within a Window. I have following trigger to achieve this:

我有一个从无限来源读取的Dataflow管道。我的窗口大小是10小时,我试图使用TestStream测试我的触发器。如果元素计数达到窗口内相同键的至少2,则我的触发器将发出早期结果。我有以下触发器来实现这一点:

input.apply(Window.into(FixedWindows.of(Duration.standardHours(12)))              .triggering(AfterWatermark.pastEndOfWindow()
        .withEarlyFirings(AfterPane.elementCountAtLeast(2)))
        .apply(Count.perElement())

We also tried:

我们还尝试过:

Repeatedly.forever(AfterPane.elementCountAtLeast(2)).orFinally(AfterWatermark.pastEndOfWindow())

I expect early firing when asserting the result, however I don't get all the result in

我希望在断言结果时提前解雇,但是我没有得到所有的结果

 PAssert.that(pipeline).inWindow(..)..

What am I doing wrong? Also running same test repeatedly yields different result meaning different values are returned from the trigger.

我究竟做错了什么?同时运行相同的测试会产生不同的结果,这意味着从触发器返回不同的值。

1 个解决方案

#1


2  

Triggering is non-deterministic. It will give you an early firing some time after the trigger condition is satisfied. It will then give you another early firing some time after the trigger condition is satisfied again.

触发是非确定性的。在触发条件满足后,它会在一段时间内提前触发。然后,在再次满足触发条件后,它会在一段时间内再次提前触发。

The actual choice to emit after the trigger is determined by the runner. If you are using a batch runner, it may wait until all the data is available. How much input are you expecting for each key/window? Which runner are you using?

触发后发出的实际选择由跑步者决定。如果您使用批处理运行程序,它可能会等到所有数据都可用。您希望每个键/窗口输入多少输入?你在用哪个跑步者?

#1


2  

Triggering is non-deterministic. It will give you an early firing some time after the trigger condition is satisfied. It will then give you another early firing some time after the trigger condition is satisfied again.

触发是非确定性的。在触发条件满足后,它会在一段时间内提前触发。然后,在再次满足触发条件后,它会在一段时间内再次提前触发。

The actual choice to emit after the trigger is determined by the runner. If you are using a batch runner, it may wait until all the data is available. How much input are you expecting for each key/window? Which runner are you using?

触发后发出的实际选择由跑步者决定。如果您使用批处理运行程序,它可能会等到所有数据都可用。您希望每个键/窗口输入多少输入?你在用哪个跑步者?