I have a directory on GCS or another supported filesystem to which new files are being written by an external process.
我在GCS或其他受支持的文件系统上有一个目录,外部进程正在向该文件系统写入新文件。
I would like to write an Apache Beam streaming pipeline that continuously watches this directory for new files and reads and processes each new file as it arrives. Is this possible?
我想编写一个Apache Beam流管道,它不断地在这个目录中查看新文件,并在每个新文件到达时读取和处理它们。这可能吗?
1 个解决方案
#1
3
This is possible starting with Apache Beam 2.2.0. Several APIs support this use case:
这可以从Apache Beam 2.2.0开始。有几个API支持这个用例:
If you're using TextIO
or AvroIO
, they support this explicitly via TextIO.read().watchForNewFiles()
and the same on readAll()
, for example:
如果你正在使用TextIO或AvroIO,他们通过TextIO.read()。watchForNewFiles()显式支持这一点,并且在readAll()上也是如此,例如:
PCollection<String> lines = p.apply(TextIO.read()
.from("gs://path/to/files/*")
.watchForNewFiles(
// Check for new files every 30 seconds
Duration.standardSeconds(30),
// Never stop checking for new files
Watch.Growth.<String>never()));
If you're using a different file format, you may use FileIO.match().continuously()
and FileIO.matchAll().continuously()
which support the same API, in combination with FileIO.readMatches()
.
如果您使用的是其他文件格式,则可以与FileIO.readMatches()一起使用FileIO.match()。continuous()和FileIO.matchAll()。continuous(),它们支持相同的API。
The APIs support specifying how often to check for new files, and when to stop checking (supported conditions are e.g. "if no new output appears within a given time", "after observing N outputs", "after a given time since starting to check" and their combinations).
API支持指定检查新文件的频率,以及何时停止检查(支持的条件是例如“如果在给定时间内没有出现新输出”,“观察N个输出后”,“自开始检查后的给定时间后” “和他们的组合)。
Note that right now this feature currently works only in the Direct runner and the Dataflow runner, and only in the Java SDK. In general, it will work in any runner that supports Splittable DoFn (see capability matrix).
请注意,此功能目前仅适用于Direct runner和Dataflow runner,仅适用于Java SDK。通常,它适用于任何支持Splittable DoFn的运行器(请参阅功能矩阵)。
#1
3
This is possible starting with Apache Beam 2.2.0. Several APIs support this use case:
这可以从Apache Beam 2.2.0开始。有几个API支持这个用例:
If you're using TextIO
or AvroIO
, they support this explicitly via TextIO.read().watchForNewFiles()
and the same on readAll()
, for example:
如果你正在使用TextIO或AvroIO,他们通过TextIO.read()。watchForNewFiles()显式支持这一点,并且在readAll()上也是如此,例如:
PCollection<String> lines = p.apply(TextIO.read()
.from("gs://path/to/files/*")
.watchForNewFiles(
// Check for new files every 30 seconds
Duration.standardSeconds(30),
// Never stop checking for new files
Watch.Growth.<String>never()));
If you're using a different file format, you may use FileIO.match().continuously()
and FileIO.matchAll().continuously()
which support the same API, in combination with FileIO.readMatches()
.
如果您使用的是其他文件格式,则可以与FileIO.readMatches()一起使用FileIO.match()。continuous()和FileIO.matchAll()。continuous(),它们支持相同的API。
The APIs support specifying how often to check for new files, and when to stop checking (supported conditions are e.g. "if no new output appears within a given time", "after observing N outputs", "after a given time since starting to check" and their combinations).
API支持指定检查新文件的频率,以及何时停止检查(支持的条件是例如“如果在给定时间内没有出现新输出”,“观察N个输出后”,“自开始检查后的给定时间后” “和他们的组合)。
Note that right now this feature currently works only in the Direct runner and the Dataflow runner, and only in the Java SDK. In general, it will work in any runner that supports Splittable DoFn (see capability matrix).
请注意,此功能目前仅适用于Direct runner和Dataflow runner,仅适用于Java SDK。通常,它适用于任何支持Splittable DoFn的运行器(请参阅功能矩阵)。