
时间:2022-11-20 19:17:31

We have a DataFlow job that is subscribed to a PubSub stream of events. We have applied sliding windows of 1 hour with a 10 minute period. In our code, we perform a Count.perElement to get the counts for each element and we then want to run this through a Top.of to get the top N elements.


At a high level: 1) Read from pubSub IO 2) Window.into(SlidingWindows.of(windowSize).every(period)) // windowSize = 1 hour, period = 10 mins 3) Count.perElement 4) Top.of(n, comparisonFunction)

在高级别:1)从pubSub IO读取2)Window.into(SlidingWindows.of(windowSize)。每个(句点))// windowSize = 1小时,句点= 10分钟3)Count.perElement 4)Top.of (n,comparisonFunction)

What we're seeing is that the window is being applied twice so data seems to be watermarked 1 hour 40 mins (instead of 50 mins) behind current time. When we dig into the job graph on the Dataflow console, we see that there are two groupByKey operations being performed on the data: 1) As part of Count.perElement. Watermark on the data from this step onwards is 50 minutes behind current time which is expected. 2) As part of the Top.of (in the Combine.PerKey). Watermark on this seems to be another 50 minutes behind the current time. Thus, data in steps below this is watermarked 1:40 mins behind.

我们看到的是窗口正在被应用两次,因此数据似乎在当前时间之后1小时40分钟(而不是50分钟)被加水印。当我们深入研究Dataflow控制台上的作业图时,我们发现对数据执行了两个groupByKey操作:1)作为Count.perElement的一部分。从此步骤开始的数据上的水印比当前时间晚50分钟,这是预期的。 2)作为Top.of的一部分(在Combine.PerKey中)。这个水印似乎比现在还落后50分钟。因此,以下步骤中的数据在1:40分钟后加水印。

This ultimately manifests in some downstream graphs being 50 minutes late.


Thus it seems like every time a GroupByKey is applied, windowing seems to kick in afresh.


Is this expected behavior? Anyway we can make the windowing only be applicable for the Count.perElement and turn it off after that?


Our code is something on the lines of:


final int top = 50;
final Duration windowSize = standardMinutes(60);
final Duration windowPeriod = standardMinutes(10);
final SlidingWindows window = SlidingWindows.of(windowSize).every(windowPeriod);


final Pipeline pipeline = Pipeline.create(options);

// Get events
final String eventTopic =
    "projects/" + options.getProject() + "/topics/eventLog";
final PCollection<String> events = pipeline

// Create toplist
final PCollection<List<KV<String, Long>>> topList = events
    .apply(Count.perElement()) //as eventIds are repeated
    // get top n to get top events
    .apply(Top.of(top, orderByValue()).withoutDefaults()); 

1 个解决方案



Windowing is not applied each time there is a GroupByKey. The lag you were seeing was likely the result of two issues, both of which should be resolved.


The first was that data that was buffered for later windows at the first group by key was preventing the watermark from advancing, which meant that the earlier windows were getting held up at the second group by key. This has been fixed in the latest versions of the SDK.


The second was that the sliding windows was causing the amount of data to increase significantly. A new optimization has been added which uses the combine (you mentioned Count and Top) to reduce the amount of data.




Windowing is not applied each time there is a GroupByKey. The lag you were seeing was likely the result of two issues, both of which should be resolved.


The first was that data that was buffered for later windows at the first group by key was preventing the watermark from advancing, which meant that the earlier windows were getting held up at the second group by key. This has been fixed in the latest versions of the SDK.


The second was that the sliding windows was causing the amount of data to increase significantly. A new optimization has been added which uses the combine (you mentioned Count and Top) to reduce the amount of data.
