I’m currently creating a PCollectionView by reading filtering information from a gcs bucket and passing it as side input to different stages of my pipeline in order to filter the output. If the file in the gcs bucket changes, I want the currently running pipeline to use this new filter info. Is there a way to update this PCollectionView on each new window of data if my filter changes? I thought I could do it in a startBundle but I can’t figure out how or if it’s possible. Could you give an example if it is possible.
我目前正在通过从gcs存储桶中读取过滤信息并将其作为侧输入传递到我的管道的不同阶段来创建PCollectionView,以便过滤输出。如果gcs存储桶中的文件发生更改,我希望当前运行的管道使用此新的过滤器信息。如果我的过滤器更改,有没有办法在每个新的数据窗口更新此PCollectionView?我以为我可以在startBundle中完成它,但我无法弄清楚它是如何或是否可能。如果有可能,你能举个例子吗?
PCollectionView<Map<String, TagObject>>
tagMapView =
pipeline.apply(TextIO.Read.named("TagListTextRead")
.from("gs://tag-list-bucket/tag-list.json"))
.apply(ParDo.named("TagsToTagMap").of(new Tags.BuildTagListMapFn()))
.apply("MakeTagMapView", View.asSingleton());
PCollection<String>
windowedData =
pipeline.apply(PubsubIO.Read.topic("myTopic"))
.apply(Window.<String>into(
SlidingWindows.of(Duration.standardMinutes(15))
.every(Duration.standardSeconds(31))));
PCollection<MY_DATA>
lineData = windowedData
.apply(ParDo.named("ExtractJsonObject")
.withSideInputs(tagMapView)
.of(new ExtractJsonObjectFn()));
1 个解决方案
#1
5
You probably want something like "use an at most a 1-minute-old version of the filter as a side input" (since in theory the file can change frequently, unpredictably, and independently from your pipeline - so there's no way really to completely synchronize changes of the file with the behavior of the pipeline).
你可能想要“最多使用一个1分钟的过滤器版本作为侧面输入”(因为从理论上讲,文件可能经常变化,不可预测,并且独立于管道 - 所以没有办法真正完全将文件的更改与管道的行为同步)。
Here's a (granted, rather clumsy) solution I was able to come up with. It relies on the fact that side inputs are implicitly also keyed by window. In this solution we're going to create a side input windowed into 1-minute fixed windows, where each window will contain a single value of the tag map, derived from the filter file as-of some moment inside that window.
这是我能够提出的一个(授予的,相当笨拙的)解决方案。它依赖于侧输入也被窗口隐式地键入的事实。在这个解决方案中,我们将创建一个窗口为1分钟固定窗口的侧面输入,其中每个窗口将包含标记贴图的单个值,从该过滤器文件中导出该窗口内的某个时刻。
PCollection<Long> ticks = p
// Produce 1 "tick" per second
.apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(1)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
// Use an arbitrary per-window combiner to reduce to 1 element per window
.apply(Count.globally());
// Produce a collection of tag maps, 1 per each 1-minute window
PCollectionView<TagMap> tagMapView = ticks
.apply(MapElements.via((Long ignored) -> {
... manually read the json file as a TagMap ...
}))
.apply(View.asSingleton());
This pattern (joining against slowly changing external data as a side input) is coming up repeatedly, and the solution I'm proposing here is far from perfect, I wish we had better support for this in the programming model. I've filed a BEAM JIRA issue to track this.
这种模式(加入缓慢变化的外部数据作为侧面输入)正在反复出现,我在这里提出的解决方案远非完美,我希望我们在编程模型中有更好的支持。我已经提交了一个BEAM JIRA问题来跟踪这个问题。
#1
5
You probably want something like "use an at most a 1-minute-old version of the filter as a side input" (since in theory the file can change frequently, unpredictably, and independently from your pipeline - so there's no way really to completely synchronize changes of the file with the behavior of the pipeline).
你可能想要“最多使用一个1分钟的过滤器版本作为侧面输入”(因为从理论上讲,文件可能经常变化,不可预测,并且独立于管道 - 所以没有办法真正完全将文件的更改与管道的行为同步)。
Here's a (granted, rather clumsy) solution I was able to come up with. It relies on the fact that side inputs are implicitly also keyed by window. In this solution we're going to create a side input windowed into 1-minute fixed windows, where each window will contain a single value of the tag map, derived from the filter file as-of some moment inside that window.
这是我能够提出的一个(授予的,相当笨拙的)解决方案。它依赖于侧输入也被窗口隐式地键入的事实。在这个解决方案中,我们将创建一个窗口为1分钟固定窗口的侧面输入,其中每个窗口将包含标记贴图的单个值,从该过滤器文件中导出该窗口内的某个时刻。
PCollection<Long> ticks = p
// Produce 1 "tick" per second
.apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(1)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
// Use an arbitrary per-window combiner to reduce to 1 element per window
.apply(Count.globally());
// Produce a collection of tag maps, 1 per each 1-minute window
PCollectionView<TagMap> tagMapView = ticks
.apply(MapElements.via((Long ignored) -> {
... manually read the json file as a TagMap ...
}))
.apply(View.asSingleton());
This pattern (joining against slowly changing external data as a side input) is coming up repeatedly, and the solution I'm proposing here is far from perfect, I wish we had better support for this in the programming model. I've filed a BEAM JIRA issue to track this.
这种模式(加入缓慢变化的外部数据作为侧面输入)正在反复出现,我在这里提出的解决方案远非完美,我希望我们在编程模型中有更好的支持。我已经提交了一个BEAM JIRA问题来跟踪这个问题。