使用每个元素的apache beam将流写入gcs

时间:2021-03-22 19:19:46

Current beam pipeline is reading files as stream using FileIO.matchAll().continuously(). This returns PCollection . I want to write these files back with the same names to another gcs bucket i.e each PCollection is one file metadata/readableFile which should be written back to another bucket after some processing. Is there any sink that i should use to achieve writing each PCollection item back to GCS or are there any ways to do it ? Is it possible to create a window per element and then use some GCS sink IO to be able to do this. When operating on a window (even if it has multiple elements) , does beam guarantees that either a window is fully processed or not processed at all , in other words are write operations to GCS or bigquery for a given window atomic and not partial in case of any failures ?

当前的光束管道使用FileIO.matchAll()。continuous()将文件作为流读取。这将返回PCollection。我想用相同的名称将这些文件写回另一个gcs桶,即每个PCollection是一个文件metadata / readableFile,应该在一些处理后写回另一个桶。我是否应该使用任何接收器来实现将每个PCollection项目写回GCS,或者有任何方法可以实现吗?是否可以为每个元素创建一个窗口,然后使用一些GCS接收器IO来执行此操作。当在窗口上操作时(即使它有多个元素),beam是否保证窗口完全处理或根本不处理,换句话说是对给定窗口原子的GCS或bigquery的写操作,而不是部分的任何失败?

1 个解决方案



Can you simply write a DoFn<ReadableFile, Void> that takes the file and copies it to the desired location using the FileSystems API? You don't need any "sink" to do that - and, in any case, this is what all "sinks" (TextIO.write(), AvroIO.write() etc.) are under the hood anyway: they are simply Beam transforms made of ParDo's and GroupByKey's.

你能简单地编写一个DoFn 来获取文件并使用FileSystems API将其复制到所需的位置吗?你不需要任何“接收器”来做到这一点 - 而且,无论如何,这就是所有“下沉”(TextIO.write(),AvroIO.write()等)无论如何都在幕后:它们只是由ParDo和GroupByKey组成的波束变换。 ,void>



Can you simply write a DoFn<ReadableFile, Void> that takes the file and copies it to the desired location using the FileSystems API? You don't need any "sink" to do that - and, in any case, this is what all "sinks" (TextIO.write(), AvroIO.write() etc.) are under the hood anyway: they are simply Beam transforms made of ParDo's and GroupByKey's.

你能简单地编写一个DoFn 来获取文件并使用FileSystems API将其复制到所需的位置吗?你不需要任何“接收器”来做到这一点 - 而且,无论如何,这就是所有“下沉”(TextIO.write(),AvroIO.write()等)无论如何都在幕后:它们只是由ParDo和GroupByKey组成的波束变换。 ,void>