从Pipeline中的PCollection GCS文件名中读取文件?

时间:2021-07-12 15:33:56

I have a streaming pipeline hooked up to pub/sub that publishes filenames of GCS files. From there I want to read each file and parse out the events on each line (the events are what I ultimately want to process).

我有一个流媒体管道连接到pub / sub,它发布了GCS文件的文件名。从那里我想读取每个文件并解析每一行上的事件(事件是我最终想要处理的事件)。

Can I use TextIO? Can you use it in a streaming pipeline when the filename is defined during execution (as opposed to using TextIO as a source and the fileName(s) are known at construction). If not I'm thinking of doing something like the following:

我可以使用TextIO吗?当在执行期间定义文件名时,您是否可以在流管道中使用它(而不是使用TextIO作为源,而fileName(s)在构造时是已知的)。如果不是,我正在考虑做以下事情:

Get the topic from pub/sub ParDo to read each file and get the lines Process the lines of the file...

从pub / sub ParDo获取主题以读取每个文件并获取行处理文件的行...

Could I use the FileBasedReader or something similar in this case to read the files? The files aren't too big so I wouldn't need to parallelize the reading of a single file, but I would need to read a lot of files.

我可以使用FileBasedReader或类似的东西来读取文件吗?这些文件不是太大,所以我不需要并行读取单个文件,但我需要读取大量文件。

1 个解决方案

#1


4  

You can use the TextIO.readAll() transform, which has been recently added to Beam in #3443. For example:

您可以使用最近在#3443中添加到Beam的TextIO.readAll()变换。例如:

PCollection<String> filenames = p.apply(PubsubIO.readStrings()...);
PCollection<String> lines = filenames.apply(TextIO.readAll());

This will read all lines in each file arriving over pubsub.

这将读取到达pubsub的每个文件中的所有行。

#1


4  

You can use the TextIO.readAll() transform, which has been recently added to Beam in #3443. For example:

您可以使用最近在#3443中添加到Beam的TextIO.readAll()变换。例如:

PCollection<String> filenames = p.apply(PubsubIO.readStrings()...);
PCollection<String> lines = filenames.apply(TextIO.readAll());

This will read all lines in each file arriving over pubsub.

这将读取到达pubsub的每个文件中的所有行。