与TestPipeline一起使用时SlidingWindows的奇怪行为

时间:2021-07-31 15:35:47

I've got a simple test that demonstrate an odd behaviour of sliding window when used with TestPipeline. Basically a bunch of strings is fed to the input, then they get accumulated in the sliding window, then the sum aggregation is applied to count the duplicates and finally the output of the aggregation function is logged. With a sliding window of 10 minutes duration and 5 minutes period I expected only one window being used to store all the elements (as the new one is started in 5 minutes after the first one)...

我有一个简单的测试,用于演示与TestPipeline一起使用时滑动窗口的奇怪行为。基本上将一串字符串馈送到输入,然后它们在滑动窗口中累积,然后应用总和聚合来计算重复项,最后记录聚合函数的输出。使用10分钟持续时间和5分钟时间的滑动窗口,我预计只有一个窗口用于存储所有元素(因为新窗口在第一个窗口后5分钟内启动)...

public class SlidingWindowTest {
    private static PipelineOptions options = PipelineOptionsFactory.create();
    private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTest.class);

    private static class IdentityDoFn extends DoFn<KV<String, Integer>, KV<String, Integer>>
        implements DoFn.RequiresWindowAccess{
        @Override
        public void processElement(ProcessContext processContext) throws Exception {
            KV<String, Integer> item = processContext.element();
            LOG.info("~~~~~~~~~~> {} => {}", item.getKey(), item.getValue());
            LOG.info("~~~~~~~~~~~ {}", processContext.window());
            processContext.output(item);
        }
    }

    @Test
    public void whatsWrongWithSlidingWindow() {
        Pipeline p = TestPipeline.create(options);

        p.apply(Create.of("cab", "abc", "a1b2c3", "abc", "a1b2c3"))
            .apply(MapElements.via((String item) -> KV.of(item, 1))
                       .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
            .apply(Window.<KV<String, Integer>>into(SlidingWindows.of(Duration.standardMinutes(10))
                                                        .every(Duration.standardMinutes(5))))
            .apply(Sum.integersPerKey())
            .apply(ParDo.of(new IdentityDoFn()));

        p.run();
    }
}

But I got 8 windows being fired instead. Is there something wrong with TestPipeline or with my understanding of how sliding windows are supposed to work?

但我得到了8个窗户被解雇了。 TestPipeline有什么问题,或者我对滑动窗口应该如何工作有所了解?

12:19:04.566 [main] DEBUG c.g.c.d.sdk.coders.CoderRegistry - Default coder for com.google.cloud.dataflow.sdk.values.KV<java.lang.String, java.lang.Integer>: KvCoder(StringUtf8Coder, VarIntCoder)
12:19:04.566 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> abc => 2
12:19:04.567 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
12:19:04.567 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> abc => 2
12:19:04.567 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:55:00.000Z..-290308-12-21T20:05:00.000Z)
12:19:04.567 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> a1b2c3 => 2
12:19:04.567 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)
12:19:04.567 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> cab => 1
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> a1b2c3 => 2
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> cab => 1
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:55:00.000Z..-290308-12-21T20:05:00.000Z)
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> abc => 2
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> cab => 1
12:19:04.568 [main] INFO  c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)

P/S: Dataflow sdk version: 1.8.0

P / S:Dataflow sdk版本:1.8.0

1 个解决方案

#1


2  

The expected behavior is different that what you observe, but also different from what you expect:

预期的行为与您观察到的不同,但也与您的期望不同:

  • First, you have three different keys, so if they all fell into a single window, then you would expect three outputs.
  • 首先,你有三个不同的键,所以如果它们都落入一个窗口,那么你会期望三个输出。
  • For sliding windows of 10 minutes with a 5 minute period, every element necessarily falls into two windows. If an element arrives at minute 1 it falls into both the window from 0 to 10 but also the window from -5 to 5. So you should expect six output values, two per key. It is a common pitfall to think of windows as something that updates as a pipeline runs, when in fact they are simply calculated properties of the input data, not a property of its arrival time or the pipeline's execution.
  • 对于5分钟的10分钟滑动窗口,每个元素必然落入两个窗口。如果一个元素到达第1分钟,它会从0到10落入窗口,但也会从-5到5落入窗口。因此,您应该期望六个输出值,每个键两个。将窗口视为管道运行时更新的东西是一个常见的陷阱,实际上它们只是输入数据的计算属性,而不是其到达时间或管道执行的属性。
  • The Create transform will output all values with a timestamp of BoundedWindow.TIMESTAMP_MIN_VALUE so they should all fall into the same two windows.
  • Create转换将输出时间戳为BoundedWindow.TIMESTAMP_MIN_VALUE的所有值,因此它们都应该落入相同的两个窗口。

Your example seems to indicate a real bug. It should not be possible for "a1b2c3" to be in the two disjoint windows that it falls in, nor for "abc" to fall into three windows, two of which are disjoint.

你的例子似乎表明了一个真正的错误。 “a1b2c3”不应该在它落入的两个不相交的窗口中,也不应该“abc”落入三个窗口,其中两个窗口是不相交的。

Incidentally, though, you would benefit from checking out DataflowAssert (called PAssert now in Beam) for testing the contents of a PCollection in a consistent and cross-runner way.

顺便提一下,您可以通过检查DataflowAssert(现在称为Beam中的PAssert)来以一致和跨流的方式测试PCollection的内容。

#1


2  

The expected behavior is different that what you observe, but also different from what you expect:

预期的行为与您观察到的不同,但也与您的期望不同:

  • First, you have three different keys, so if they all fell into a single window, then you would expect three outputs.
  • 首先,你有三个不同的键,所以如果它们都落入一个窗口,那么你会期望三个输出。
  • For sliding windows of 10 minutes with a 5 minute period, every element necessarily falls into two windows. If an element arrives at minute 1 it falls into both the window from 0 to 10 but also the window from -5 to 5. So you should expect six output values, two per key. It is a common pitfall to think of windows as something that updates as a pipeline runs, when in fact they are simply calculated properties of the input data, not a property of its arrival time or the pipeline's execution.
  • 对于5分钟的10分钟滑动窗口,每个元素必然落入两个窗口。如果一个元素到达第1分钟,它会从0到10落入窗口,但也会从-5到5落入窗口。因此,您应该期望六个输出值,每个键两个。将窗口视为管道运行时更新的东西是一个常见的陷阱,实际上它们只是输入数据的计算属性,而不是其到达时间或管道执行的属性。
  • The Create transform will output all values with a timestamp of BoundedWindow.TIMESTAMP_MIN_VALUE so they should all fall into the same two windows.
  • Create转换将输出时间戳为BoundedWindow.TIMESTAMP_MIN_VALUE的所有值,因此它们都应该落入相同的两个窗口。

Your example seems to indicate a real bug. It should not be possible for "a1b2c3" to be in the two disjoint windows that it falls in, nor for "abc" to fall into three windows, two of which are disjoint.

你的例子似乎表明了一个真正的错误。 “a1b2c3”不应该在它落入的两个不相交的窗口中,也不应该“abc”落入三个窗口,其中两个窗口是不相交的。

Incidentally, though, you would benefit from checking out DataflowAssert (called PAssert now in Beam) for testing the contents of a PCollection in a consistent and cross-runner way.

顺便提一下,您可以通过检查DataflowAssert(现在称为Beam中的PAssert)来以一致和跨流的方式测试PCollection的内容。