We have a DataFlow job that is subscribed to a PubSub stream of events. We have applied sliding windows of 1 hour with a 10 minute period. In our code, we perform a Count.perElement to get the counts for each element and we then want to run this through a Top.of to get the top N elements.
我们有一个订阅了PubSub事件流的DataFlow作业。我们应用了1小时的滑动窗口,时间为10分钟。在我们的代码中,我们执行Count.perElement来获取每个元素的计数,然后我们想通过Top.of运行它来获得前N个元素。
At a high level: 1) Read from pubSub IO 2) Window.into(SlidingWindows.of(windowSize).every(period)) // windowSize = 1 hour, period = 10 mins 3) Count.perElement 4) Top.of(n, comparisonFunction)
在高级别:1)从pubSub IO读取2)Window.into(SlidingWindows.of(windowSize)。每个(句点))// windowSize = 1小时,句点= 10分钟3)Count.perElement 4)Top.of (n,comparisonFunction)
What we're seeing is that the window is being applied twice so data seems to be watermarked 1 hour 40 mins (instead of 50 mins) behind current time. When we dig into the job graph on the Dataflow console, we see that there are two groupByKey operations being performed on the data: 1) As part of Count.perElement. Watermark on the data from this step onwards is 50 minutes behind current time which is expected. 2) As part of the Top.of (in the Combine.PerKey). Watermark on this seems to be another 50 minutes behind the current time. Thus, data in steps below this is watermarked 1:40 mins behind.
我们看到的是窗口正在被应用两次,因此数据似乎在当前时间之后1小时40分钟(而不是50分钟)被加水印。当我们深入研究Dataflow控制台上的作业图时,我们发现对数据执行了两个groupByKey操作:1)作为Count.perElement的一部分。从此步骤开始的数据上的水印比当前时间晚50分钟,这是预期的。 2)作为Top.of的一部分(在Combine.PerKey中)。这个水印似乎比现在还落后50分钟。因此,以下步骤中的数据在1:40分钟后加水印。
This ultimately manifests in some downstream graphs being 50 minutes late.
这最终表现在一些下游图表迟到了50分钟。
Thus it seems like every time a GroupByKey is applied, windowing seems to kick in afresh.
因此,似乎每次应用GroupByKey时,窗口似乎都会重新启动。
Is this expected behavior? Anyway we can make the windowing only be applicable for the Count.perElement and turn it off after that?
这是预期的行为吗?无论如何,我们可以使窗口仅适用于Count.perElement并在此之后将其关闭?
Our code is something on the lines of:
我们的代码有以下几点:
final int top = 50;
final Duration windowSize = standardMinutes(60);
final Duration windowPeriod = standardMinutes(10);
final SlidingWindows window = SlidingWindows.of(windowSize).every(windowPeriod);
options.setWorkerMachineType("n1-standard-16");
options.setWorkerDiskType("compute.googleapis.com/projects//zones//diskTypes/pd-ssd");
options.setJobName(applicationName);
options.setStreaming(true);
options.setRunner(DataflowPipelineRunner.class);
final Pipeline pipeline = Pipeline.create(options);
// Get events
final String eventTopic =
"projects/" + options.getProject() + "/topics/eventLog";
final PCollection<String> events = pipeline
.apply(PubsubIO.Read.topic(eventTopic));
// Create toplist
final PCollection<List<KV<String, Long>>> topList = events
.apply(Window.into(window))
.apply(Count.perElement()) //as eventIds are repeated
// get top n to get top events
.apply(Top.of(top, orderByValue()).withoutDefaults());
1 个解决方案
#1
1
Windowing is not applied each time there is a GroupByKey. The lag you were seeing was likely the result of two issues, both of which should be resolved.
每次存在GroupByKey时都不会应用窗口化。您看到的滞后可能是两个问题的结果,两个问题都应该得到解决。
The first was that data that was buffered for later windows at the first group by key was preventing the watermark from advancing, which meant that the earlier windows were getting held up at the second group by key. This has been fixed in the latest versions of the SDK.
第一个是按键锁定第一组后续窗口的数据阻止水印前进,这意味着早期的窗口被按键锁定在第二组。这已在最新版本的SDK中修复。
The second was that the sliding windows was causing the amount of data to increase significantly. A new optimization has been added which uses the combine (you mentioned Count and Top) to reduce the amount of data.
第二个是滑动窗口导致数据量显着增加。添加了一个新的优化,它使用了组合(您提到的Count和Top)来减少数据量。
#1
1
Windowing is not applied each time there is a GroupByKey. The lag you were seeing was likely the result of two issues, both of which should be resolved.
每次存在GroupByKey时都不会应用窗口化。您看到的滞后可能是两个问题的结果,两个问题都应该得到解决。
The first was that data that was buffered for later windows at the first group by key was preventing the watermark from advancing, which meant that the earlier windows were getting held up at the second group by key. This has been fixed in the latest versions of the SDK.
第一个是按键锁定第一组后续窗口的数据阻止水印前进,这意味着早期的窗口被按键锁定在第二组。这已在最新版本的SDK中修复。
The second was that the sliding windows was causing the amount of data to increase significantly. A new optimization has been added which uses the combine (you mentioned Count and Top) to reduce the amount of data.
第二个是滑动窗口导致数据量显着增加。添加了一个新的优化,它使用了组合(您提到的Count和Top)来减少数据量。