apache beam streaming管道来监视gcs文件正则表达式

时间:2021-03-03 15:22:25

I have a streaming beam pipeline where I try to monitor multiple globs/regex patterns. Few of those patterns already have files matching and few of the patterns will be generated in future.

我有一个流式光束管道,我尝试监控多个globs / regex模式。这些模式中很少有文件匹配,将来会生成很少的模式。

PCollection<String> fileGlobs = p.apply(Create.of(filePatterns));

PCollection<Metadata> f = fileGlobs.apply("MatchALL",
    FileIO.matchAll().continuously(
        Duration.standardSeconds(10),
        Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));

f = .. some more transformations and then write to gcs ..

The expected behaviour is to match the existing files with patterns provided and also watch over them to see if new files matching those patterns are being written to GCS. The termination condition i enforce is don't try to match patterns if the last file generated that matched that particular pattern was more than an hour ago. The observed behaviour is we are matching a lot of files but the transforms after getting unbounded f are not being executed at all. The logs just show

预期的行为是使现有文件与提供的模式匹配,并监视它们以查看是否正在将与这些模式匹配的新文件写入GCS。如果生成的与该特定模式匹配的最后一个文件超过一小时前,则强制执行的终止条件是不尝试匹配模式。观察到的行为是我们匹配了很多文件,但是获得*f后的变换根本没有被执行。日志只显示

polling returned 681384 results, of which 681384 were new. The output is incomplete.

I give 2 different regex pattern to watch over. One of the existing regex pattern already had ~500k files matching and more were being added every minute for which i never saw an output and just the above log line. The second regex pattern was matching 0 files (when starting pipeline) but as soon as at some future point it started matching with newly coming files , those output files were being written to gcs.

我给出了2种不同的正则表达式模式来观看。其中一个现有的正则表达式模式已经有~500k文件匹配,并且每分钟都会添加更多文件,我从未看过输出而只是上面的日志行。第二个正则表达式模式匹配0个文件(当启动管道时)但是在将来某个时候它开始与新出现的文件匹配,那些输出文件被写入gcs。

Can someone explain this behaviour and if i am using match continuously correctly . I don't create any windows here because my use case is pretty simple , stream files -> read files -> filter some events -> write back those files to some gcs bucket.

有人可以解释这种行为,如果我正在连续正确使用匹配。我不在这里创建任何窗口,因为我的用例非常简单,流文件 - >读取文件 - >过滤一些事件 - >将这些文件写回某些gcs存储桶。

1 个解决方案

#1


1  

This is a bug in Splittable DoFn that affects the Watch transform in case a single round of polling takes more than 10 seconds - which happens when watching a filepattern that matches a very large number of files. The bug causes no output to be produced, because the transform gets checkpointed before it makes any progress, so when it resumes from the checkpoint, it's "back to square 1" in a sense.

这是Splittable DoFn中的一个错误,它影响Watch转换,以防单轮轮询超过10秒 - 这在观看与大量文件匹配的文件模式时会发生。该错误不会导致产生输出,因为转换在进行任何进展之前都会被检查点,所以当它从检查点恢复时,它在某种意义上是“回到方形1”。

Please follow JIRA for updates and a suggested workaround.

请关注JIRA以获取更新和建议的解决方法。

#1


1  

This is a bug in Splittable DoFn that affects the Watch transform in case a single round of polling takes more than 10 seconds - which happens when watching a filepattern that matches a very large number of files. The bug causes no output to be produced, because the transform gets checkpointed before it makes any progress, so when it resumes from the checkpoint, it's "back to square 1" in a sense.

这是Splittable DoFn中的一个错误,它影响Watch转换,以防单轮轮询超过10秒 - 这在观看与大量文件匹配的文件模式时会发生。该错误不会导致产生输出,因为转换在进行任何进展之前都会被检查点,所以当它从检查点恢复时,它在某种意义上是“回到方形1”。

Please follow JIRA for updates and a suggested workaround.

请关注JIRA以获取更新和建议的解决方法。