如何使用pubsub通知进行云存储以触发数据流管道

时间:2022-02-09 15:23:04

I'm trying to integrate a Google Cloud Dataflow pipeline with Google Cloud Pub/Sub Notifications for Google Cloud Storage. The idea is start processing a file as soon it is created. The messages are being published and with PubsubIO.readMessagesWithAttributes() source I manage to extract the file URI:

我正在尝试将Google Cloud Dataflow管道与Google Cloud Pub / Sub Notifications for Google Cloud Storage集成。这个想法是在创建文件后立即开始处理文件。消息正在发布,并且使用PubsubIO.readMessagesWithAttributes()源我设法提取文件URI:

Pipeline p = Pipeline.create(options);
PCollection<String> uris = p.apply(PubsubIO.readMessagesWithAttributes()
            .withTimestampAttribute(PUBSUB_TIMESTAMP_LABEL_KEY)
            .fromSubscription(options.getPubsubSubscription()))
            .apply(MapElements
                    .into(TypeDescriptors.strings())
                    .via((PubsubMessage msg) -> {
                        String bucket = msg.getAttribute("bucketId");
                        String object = msg.getAttribute("objectId");
                        GcsPath uri = GcsPath.fromComponents(bucket, object);
                        return uri.toString();
                    }));

Which PTransform can be used to start reading/processing each file in the uris PCollection?

哪个PTransform可用于开始读取/处理uris PCollection中的每个文件?

2 个解决方案

#1


2  

Apache Beam at HEAD includes a PTransform that does exactly what you want: TextIO.readAll() reads a PCollection<String> of filepatterns or filenames. It will be available in Beam 2.2.0, but for now you can just build a snapshot of Beam yourself from the github repo and depend on that.

HEAD中的Apache Beam包含一个完全符合您要求的PTransform:TextIO.readAll()读取文件模式或文件名的PCollection 。它将在Beam 2.2.0中提供,但是现在你可以从github repo自己构建一个Beam快照并依赖于它。

#2


0  

Combing Cloud Storage change notifications with Google Cloud Functions should be a good option (still in beta though).

将云端存储更改通知与Google云端功能相结合应该是一个不错的选择(尽管仍处于测试阶段)。

Using Cloud Functions you can launch a Dataflow job using some Javascript code. This is a very good blogpost that should get you on the way. You Dataflow job will kick-off whenever a new file lands in a bucket or a file changes and will process these files.

使用云功能,您可以使用一些Javascript代码启动Dataflow作业。这是一个非常好的博客文章,应该让你在路上。只要新文件落入存储桶或文件发生更改并处理这些文件,您就会启动数据流作业。

If you want to stick to your approach, you might want to use the Google Cloud Storage Java SDK to read the files in a custom DoFn. Not sure if that approach is preferable though.

如果您想坚持自己的方法,可能需要使用Google Cloud Storage Java SDK来读取自定义DoFn中的文件。不确定这种方法是否更可取。

#1


2  

Apache Beam at HEAD includes a PTransform that does exactly what you want: TextIO.readAll() reads a PCollection<String> of filepatterns or filenames. It will be available in Beam 2.2.0, but for now you can just build a snapshot of Beam yourself from the github repo and depend on that.

HEAD中的Apache Beam包含一个完全符合您要求的PTransform:TextIO.readAll()读取文件模式或文件名的PCollection 。它将在Beam 2.2.0中提供,但是现在你可以从github repo自己构建一个Beam快照并依赖于它。

#2


0  

Combing Cloud Storage change notifications with Google Cloud Functions should be a good option (still in beta though).

将云端存储更改通知与Google云端功能相结合应该是一个不错的选择(尽管仍处于测试阶段)。

Using Cloud Functions you can launch a Dataflow job using some Javascript code. This is a very good blogpost that should get you on the way. You Dataflow job will kick-off whenever a new file lands in a bucket or a file changes and will process these files.

使用云功能,您可以使用一些Javascript代码启动Dataflow作业。这是一个非常好的博客文章,应该让你在路上。只要新文件落入存储桶或文件发生更改并处理这些文件,您就会启动数据流作业。

If you want to stick to your approach, you might want to use the Google Cloud Storage Java SDK to read the files in a custom DoFn. Not sure if that approach is preferable though.

如果您想坚持自己的方法,可能需要使用Google Cloud Storage Java SDK来读取自定义DoFn中的文件。不确定这种方法是否更可取。