如何在Beam管道中编写会话窗口的单元测试?

时间:2021-05-04 15:35:01

I am writing a pipeline that is processing product events (create, update, delete). Each product belongs to a sale that has a certain duration. I want to be able to perform some aggregation on all the products in a given sale. For the purpose of this example, let's assume I just want a list of unique product IDs per sale.

我正在编写一个处理产品事件的管道(创建,更新,删除)。每种产品属于具有一定持续时间的销售。我希望能够对给定销售中的所有产品执行某些聚合。出于此示例的目的,我们假设我只想要每个销售的唯一产品ID列表。

Therefore, my pipeline is using session windows on the sale id with a very long gap duration (so when the sale closes and there are no more product updates being published, the window for that sale closes too). My question is, how do I write a unit test for that?

因此,我的管道正在使用销售ID上的会话窗口,并且持续时间很长(因此,当销售结束并且没有更多产品更新发布时,该销售的窗口也会关闭)。我的问题是,如何为此编写单元测试?

For the sake of this test, let's assume the following:

为了这个测试,让我们假设如下:

  • the events are just Strings with the sale ID and the product ID, separated by a space,
  • 事件只是带有销售ID和产品ID的字符串,以空格分隔,
  • the applyDistinctProductsTransform will basically perform what I've said above. Create KV<String, String> elements where the key is the sale id; set session windows with a gap duration of 600 seconds; and finally create a concatenated string of all product IDs per sale.
  • applyDistinctProductsTransform基本上会执行我上面所说的。创建KV 元素,其中键是销售ID;设置会话窗口,间隔持续时间为600秒;最后为每次销售创建一个包含所有产品ID的串联字符串。 ,string>

Here is what I have so far:

这是我到目前为止:

I create a TestStream and add some elements: 3 products for sale1. Next, I advance the watermark to 700, well beyond the gap duration. Another product is added and finally the watermark is advanced to infinity.

我创建了一个TestStream并添加了一些元素:3个产品待售1。接下来,我将水印提升到700,远远超过间隙持续时间。添加另一种产品,最后将水印提升到无限远。

@Test
public void TestSessionWindow() {
    Coder<String> utfCoder = StringUtf8Coder.of();
    TestStream<String> onTimeProducts = 
TestStream.create(utfCoder).addElements(
            TimestampedValue.of("sale1 product1", new Instant(0)),
            TimestampedValue.of("sale1 product2", new Instant(0)),
            TimestampedValue.of("sale1 product3", new Instant(0))
    )
            .advanceWatermarkTo(new Instant(700)) // watermark passes trigger time
    .addElements(
            TimestampedValue.of("campaign1 product9", new Instant(710))
    )
    .advanceWatermarkToInfinity();

    PCollection<KV<String, String>> results = applyDistinctProductsTransform(pipeline, onTimeProducts);

    PAssert.that(results).containsInAnyOrder(
            KV.of("sale1", "product1,product2,product3"),
            KV.of("sale1", "product9")
    );
    pipeline.run().waitUntilFinish();
}

However,

然而,

  1. the pipeline outputs a KV of sale1, product1,product2,product3,product9 so product9 is appended to the window. I would've expected this product to be processed in a separate window and hence end up in a different row in the output PCollection.
  2. 管道输出销售1,产品1,产品2,产品3,产品9的KV,因此产品9附加到窗口。我希望这个产品在一个单独的窗口中处理,因此最终会在输出PCollection中的另一行。
  3. how can I only get the results of a single window in the PAssert? I know there is the inWindow function and I've found an example for a fixed time window but I don't know how to do the same for a session window.
  4. 我怎样才能在PAssert中获得单个窗口的结果?我知道有inWindow函数,我找到了一个固定时间窗口的例子,但我不知道如何为会话窗口做同样的事情。

You can check out the full code of the PTransform and the unit test.

您可以查看PTransform的完整代码和单元测试。

1 个解决方案

#1


2  

1) I believe you have a simple unit issue. The window gap duration of 600 is being specified in seconds Duration.standardSeconds yet new Instant(long) uses milliseconds which means that the 600 second gap is larger then the time interval of 700 millis causing the sessions to be merged.

1)我相信你有一个简单的单位问题。窗口间隙持续时间600以秒为单位指定持续时间。标准秒然后新的瞬间(长)使用毫秒,这意味着600秒间隙大于700毫秒的时间间隔导致会话合并。

2) Sessions still use interval windows internally. You will need to compute what the output window would be after all sessions are merged based upon your trigger strategy. By default, a session window uses the IntervalWindow(timestamp, gap duration), and merges all overlapping windows to create a larger window. For example, if you had the windows (start time, end time), [10, 14], [12, 18], [4, 14] for the same session key, they would all be merged producing a single [4, 18] window.

2)会话仍在内部使用间隔窗口。您需要根据触发策略计算合并所有会话后输出窗口的内容。默认情况下,会话窗口使用IntervalWindow(时间戳,间隙持续时间),并合并所有重叠窗口以创建更大的窗口。例如,如果你有相同会话密钥的窗口(开始时间,结束时间),[10,14],[12,18],[4,14],它们将被合并产生一个[4, 18]窗口。

#1


2  

1) I believe you have a simple unit issue. The window gap duration of 600 is being specified in seconds Duration.standardSeconds yet new Instant(long) uses milliseconds which means that the 600 second gap is larger then the time interval of 700 millis causing the sessions to be merged.

1)我相信你有一个简单的单位问题。窗口间隙持续时间600以秒为单位指定持续时间。标准秒然后新的瞬间(长)使用毫秒,这意味着600秒间隙大于700毫秒的时间间隔导致会话合并。

2) Sessions still use interval windows internally. You will need to compute what the output window would be after all sessions are merged based upon your trigger strategy. By default, a session window uses the IntervalWindow(timestamp, gap duration), and merges all overlapping windows to create a larger window. For example, if you had the windows (start time, end time), [10, 14], [12, 18], [4, 14] for the same session key, they would all be merged producing a single [4, 18] window.

2)会话仍在内部使用间隔窗口。您需要根据触发策略计算合并所有会话后输出窗口的内容。默认情况下,会话窗口使用IntervalWindow(时间戳,间隙持续时间),并合并所有重叠窗口以创建更大的窗口。例如,如果你有相同会话密钥的窗口(开始时间,结束时间),[10,14],[12,18],[4,14],它们将被合并产生一个[4, 18]窗口。