
时间:2021-11-21 15:33:19

I'm trying to integrate a Google Cloud Dataflow pipeline with Google Cloud Pub/Sub Notifications for Google Cloud Storage. The idea is start processing a file as soon it is created. The messages are being published and with PubsubIO.readMessagesWithAttributes() source I manage to extract the file URI:

我正在尝试将Google Cloud Dataflow管道与Google Cloud Pub / Sub Notifications for Google Cloud Storage集成。这个想法是在创建文件后立即开始处理文件。消息正在发布,并且使用PubsubIO.readMessagesWithAttributes()源我设法提取文件URI:

Pipeline p = Pipeline.create(options);
PCollection<String> uris = p.apply(PubsubIO.readMessagesWithAttributes()
                    .via((PubsubMessage msg) -> {
                        String bucket = msg.getAttribute("bucketId");
                        String object = msg.getAttribute("objectId");
                        GcsPath uri = GcsPath.fromComponents(bucket, object);
                        return uri.toString();

Which PTransform can be used to start reading/processing each file in the uris PCollection?

哪个PTransform可用于开始读取/处理uris PCollection中的每个文件?

2 个解决方案



Apache Beam at HEAD includes a PTransform that does exactly what you want: TextIO.readAll() reads a PCollection<String> of filepatterns or filenames. It will be available in Beam 2.2.0, but for now you can just build a snapshot of Beam yourself from the github repo and depend on that.

HEAD中的Apache Beam包含一个完全符合您要求的PTransform:TextIO.readAll()读取文件模式或文件名的PCollection 。它将在Beam 2.2.0中提供,但是现在你可以从github repo自己构建一个Beam快照并依赖于它。



Combing Cloud Storage change notifications with Google Cloud Functions should be a good option (still in beta though).


Using Cloud Functions you can launch a Dataflow job using some Javascript code. This is a very good blogpost that should get you on the way. You Dataflow job will kick-off whenever a new file lands in a bucket or a file changes and will process these files.


If you want to stick to your approach, you might want to use the Google Cloud Storage Java SDK to read the files in a custom DoFn. Not sure if that approach is preferable though.

如果您想坚持自己的方法,可能需要使用Google Cloud Storage Java SDK来读取自定义DoFn中的文件。不确定这种方法是否更可取。



Apache Beam at HEAD includes a PTransform that does exactly what you want: TextIO.readAll() reads a PCollection<String> of filepatterns or filenames. It will be available in Beam 2.2.0, but for now you can just build a snapshot of Beam yourself from the github repo and depend on that.

HEAD中的Apache Beam包含一个完全符合您要求的PTransform:TextIO.readAll()读取文件模式或文件名的PCollection 。它将在Beam 2.2.0中提供,但是现在你可以从github repo自己构建一个Beam快照并依赖于它。



Combing Cloud Storage change notifications with Google Cloud Functions should be a good option (still in beta though).


Using Cloud Functions you can launch a Dataflow job using some Javascript code. This is a very good blogpost that should get you on the way. You Dataflow job will kick-off whenever a new file lands in a bucket or a file changes and will process these files.


If you want to stick to your approach, you might want to use the Google Cloud Storage Java SDK to read the files in a custom DoFn. Not sure if that approach is preferable though.

如果您想坚持自己的方法,可能需要使用Google Cloud Storage Java SDK来读取自定义DoFn中的文件。不确定这种方法是否更可取。