I have a streaming pipeline hooked up to pub/sub that publishes filenames of GCS files. From there I want to read each file and parse out the events on each line (the events are what I ultimately want to process).
我有一个流媒体管道连接到pub / sub,它发布了GCS文件的文件名。从那里我想读取每个文件并解析每一行上的事件(事件是我最终想要处理的事件)。
Can I use TextIO? Can you use it in a streaming pipeline when the filename is defined during execution (as opposed to using TextIO as a source and the fileName(s) are known at construction). If not I'm thinking of doing something like the following:
我可以使用TextIO吗?当在执行期间定义文件名时,您是否可以在流管道中使用它(而不是使用TextIO作为源,而fileName(s)在构造时是已知的)。如果不是,我正在考虑做以下事情:
Get the topic from pub/sub ParDo to read each file and get the lines Process the lines of the file...
从pub / sub ParDo获取主题以读取每个文件并获取行处理文件的行...
Could I use the FileBasedReader or something similar in this case to read the files? The files aren't too big so I wouldn't need to parallelize the reading of a single file, but I would need to read a lot of files.
我可以使用FileBasedReader或类似的东西来读取文件吗?这些文件不是太大,所以我不需要并行读取单个文件,但我需要读取大量文件。
1 个解决方案
#1
4
You can use the TextIO.readAll()
transform, which has been recently added to Beam in #3443. For example:
您可以使用最近在#3443中添加到Beam的TextIO.readAll()变换。例如:
PCollection<String> filenames = p.apply(PubsubIO.readStrings()...);
PCollection<String> lines = filenames.apply(TextIO.readAll());
This will read all lines in each file arriving over pubsub.
这将读取到达pubsub的每个文件中的所有行。
#1
4
You can use the TextIO.readAll()
transform, which has been recently added to Beam in #3443. For example:
您可以使用最近在#3443中添加到Beam的TextIO.readAll()变换。例如:
PCollection<String> filenames = p.apply(PubsubIO.readStrings()...);
PCollection<String> lines = filenames.apply(TextIO.readAll());
This will read all lines in each file arriving over pubsub.
这将读取到达pubsub的每个文件中的所有行。