如何在Apache Beam中“限速”PCollection?

时间:2022-01-05 15:34:27

I have what seems to be a common problem but I can't figure out what the Beam recommended solution is.

我有一个似乎常见的问题,但我无法弄清楚Beam推荐的解决方案是什么。

I have a stream of raw events and I'm looking for two separate events to fulfill a condition within a sliding window (of 60 minutes) for it to "trigger" an alert.

我有一系列原始事件,我正在寻找两个单独的事件来满足滑动窗口(60分钟)内的条件,以便“触发”警报。

That is easy enough to do with SlidingWindows, however the problem is due to its sliding nature, I effectively get that alert potentially in multiple windows. How do I ultimately get a PCollection that outputs such alert only once (within a certain timeframe/cooldown duration)?

这很容易与SlidingWindows一起使用,但问题是由于它的滑动特性,我有效地在多个窗口中获得警报。我如何最终获得仅输出此类警报一次的PCollection(在特定时间范围内/冷却持续时间内)?

I first thought that the recent stateful processing feature would be my solution, but then realized that it only works within a window. So do side-inputs. So it seems to me that I need a way of breaking the windows and treat the alert "firings" in one (possible Session-)window. But the docs don't mention any kind of way to effectively reassign elements to new windows

我首先想到最近的状态处理功能将是我的解决方案,但后来意识到它只能在一个窗口中工作。侧输入也是如此。因此在我看来,我需要一种打破窗口的方法,并在一个(可能的会话 - )窗口中处理警报“点火”。但是文档没有提到任何有效地将元素重新分配给新窗口的方法

2 个解决方案

#1


1  

I ended up with a re-windowing strategy, similar as @Kenn suggested.

我最终采用了重新开窗策略,类似于@Kenn建议的那样。

So I have alerts from a sliding windowed collection that I re-window into session windows

所以我从一个滑动窗口集合中发出警报,然后重新进入会话窗口

.apply(Window.remerge())            
.apply(Window.into(Sessions.withGapDuration(Duration.standardHours(1))))

In that windowed collection, I can just do a groupBy and thus get all Alerts of a session, within which I can apply my cooldown logic of only emitting an alert every hour.

在那个窗口集合中,我可以做一个groupBy,从而得到一个会话的所有警报,在其中我可以应用我的冷却逻辑,每小时只发出一个警报。

#2


0  

Interesting application!

有趣的应用!

Just to summarize:

总结一下:

  • It sounds like "sliding window" for your use case means continuously sliding. You could choose a minimum granularity but it isn't necessarily natural.
  • 对于您的用例来说,听起来像“滑动窗口”意味着不断滑动。您可以选择最小粒度但不一定是自然的。
  • Each set of events you are interested in should only produce one output.
  • 您感兴趣的每组事件应该只生成一个输出。

There are a couple ways to approach this, depending on the rest of your application.

有几种方法可以解决这个问题,具体取决于您的应用程序的其余部分。

One way is to leave your data in the global window and use state. You will have to manage lateness yourself - dropping elements that are too late, accounting for data coming out of order, etc, and generally keeping your state bounded.

一种方法是将数据保留在全局窗口中并使用状态。你将不得不自己管理迟到 - 删除太晚的元素,解决数据失序等问题,并保持你的状态有限。

Another way is to use sliding windows with a Combine or state (basically, what you have already tried) and then re-windowing the alerts and de-duplicating. You could use fixed windows for this since the alert should have a deterministic timestamp; the end of the window will govern when state is automatically collected so that is handy.

另一种方法是使用具有组合或状态的滑动窗口(基本上,您已经尝试过的),然后重新窗口警报和重复数据删除。您可以使用固定窗口,因为警报应该具有确定的时间戳;窗口的末尾将控制何时自动收集状态,这样便于使用。

#1


1  

I ended up with a re-windowing strategy, similar as @Kenn suggested.

我最终采用了重新开窗策略,类似于@Kenn建议的那样。

So I have alerts from a sliding windowed collection that I re-window into session windows

所以我从一个滑动窗口集合中发出警报,然后重新进入会话窗口

.apply(Window.remerge())            
.apply(Window.into(Sessions.withGapDuration(Duration.standardHours(1))))

In that windowed collection, I can just do a groupBy and thus get all Alerts of a session, within which I can apply my cooldown logic of only emitting an alert every hour.

在那个窗口集合中,我可以做一个groupBy,从而得到一个会话的所有警报,在其中我可以应用我的冷却逻辑,每小时只发出一个警报。

#2


0  

Interesting application!

有趣的应用!

Just to summarize:

总结一下:

  • It sounds like "sliding window" for your use case means continuously sliding. You could choose a minimum granularity but it isn't necessarily natural.
  • 对于您的用例来说,听起来像“滑动窗口”意味着不断滑动。您可以选择最小粒度但不一定是自然的。
  • Each set of events you are interested in should only produce one output.
  • 您感兴趣的每组事件应该只生成一个输出。

There are a couple ways to approach this, depending on the rest of your application.

有几种方法可以解决这个问题,具体取决于您的应用程序的其余部分。

One way is to leave your data in the global window and use state. You will have to manage lateness yourself - dropping elements that are too late, accounting for data coming out of order, etc, and generally keeping your state bounded.

一种方法是将数据保留在全局窗口中并使用状态。你将不得不自己管理迟到 - 删除太晚的元素,解决数据失序等问题,并保持你的状态有限。

Another way is to use sliding windows with a Combine or state (basically, what you have already tried) and then re-windowing the alerts and de-duplicating. You could use fixed windows for this since the alert should have a deterministic timestamp; the end of the window will govern when state is automatically collected so that is handy.

另一种方法是使用具有组合或状态的滑动窗口(基本上,您已经尝试过的),然后重新窗口警报和重复数据删除。您可以使用固定窗口,因为警报应该具有确定的时间戳;窗口的末尾将控制何时自动收集状态,这样便于使用。