如何在Google数据流中创建个性化WindowFn

时间:2022-12-15 15:31:08

I'd like to create a different WindowFn in a such way to assign Windows to any of my input elements based on another field instead of based on my input entry's timestamp. I know the pre-defined WindowFn's from Google DataFlow SDK use the timestamp as a criteria to assign window.

我想以这样的方式创建一个不同的WindowFn,根据另一个字段而不是基于输入条目的时间戳,将Windows分配给我的任何输入元素。我知道Google DataFlow SDK中预定义的WindowFn使用时间戳作为分配窗口的标准。

More specifically I'd like to create a kind of SlidingWindows but instead of considering timestamp as the Window assignment criteria I'd like to consider another field as that criteria.

更具体地说,我想创建一种SlidingWindows,但不考虑时间戳作为Window赋值标准,我想将另一个字段视为该条件。

How could I create my customised WindowFn? What are the points that I should consider when creating my own WindowFn?

我怎样才能创建自定义的WindowFn?在创建自己的WindowFn时我应该考虑哪些要点?

Thanks.

谢谢。

1 个解决方案

#1


2  

To create a new WindowFn, you just need to inherit from WindowFn or a subclass and override the various abstract methods.

要创建一个新的WindowFn,您只需要从WindowFn或子类继承并覆盖各种抽象方法。

In your case, you don't need window merging, so you can inherit from NonMergingWindowFn, and your code could look something like

在你的情况下,你不需要窗口合并,所以你可以从NonMergingWindowFn继承,你的代码看起来像

public class MyWindowFn extends NonMergingWindowFn<ElementT, IntervalWindow> {
  public Collection<W> assignWindows(AssignContext c) {
    return setOfWindowsElementShouldBeIn(c.element());
  }

  public boolean isCompatible(WindowFn other) {
    return other instanceof MyWindowFn;
  }

  public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
  }

  public W getSideInputWindow(final BoundedWindow window) {
    // You may not need this if you won't ever be using PCollections windowed 
    // with this as side inputs.  If that's the case, just throw.
    // Otherwise you'll need to figure out how to map the main input windows
    // into the windows generated by this WindowFn.
  }
}

#1


2  

To create a new WindowFn, you just need to inherit from WindowFn or a subclass and override the various abstract methods.

要创建一个新的WindowFn,您只需要从WindowFn或子类继承并覆盖各种抽象方法。

In your case, you don't need window merging, so you can inherit from NonMergingWindowFn, and your code could look something like

在你的情况下,你不需要窗口合并,所以你可以从NonMergingWindowFn继承,你的代码看起来像

public class MyWindowFn extends NonMergingWindowFn<ElementT, IntervalWindow> {
  public Collection<W> assignWindows(AssignContext c) {
    return setOfWindowsElementShouldBeIn(c.element());
  }

  public boolean isCompatible(WindowFn other) {
    return other instanceof MyWindowFn;
  }

  public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
  }

  public W getSideInputWindow(final BoundedWindow window) {
    // You may not need this if you won't ever be using PCollections windowed 
    // with this as side inputs.  If that's the case, just throw.
    // Otherwise you'll need to figure out how to map the main input windows
    // into the windows generated by this WindowFn.
  }
}