I'm trying to integrate a Google Cloud Dataflow pipeline with Google Cloud Pub/Sub Notifications for Google Cloud Storage. The idea is start processing a file as soon it is created. The messages are being published and with PubsubIO.readMessagesWithAttributes()
source I manage to extract the file URI:
我正在尝试将Google Cloud Dataflow管道与Google Cloud Pub / Sub Notifications for Google Cloud Storage集成。这个想法是在创建文件后立即开始处理文件。消息正在发布,并且使用PubsubIO.readMessagesWithAttributes()源我设法提取文件URI:
Pipeline p = Pipeline.create(options);
PCollection<String> uris = p.apply(PubsubIO.readMessagesWithAttributes()
.withTimestampAttribute(PUBSUB_TIMESTAMP_LABEL_KEY)
.fromSubscription(options.getPubsubSubscription()))
.apply(MapElements
.into(TypeDescriptors.strings())
.via((PubsubMessage msg) -> {
String bucket = msg.getAttribute("bucketId");
String object = msg.getAttribute("objectId");
GcsPath uri = GcsPath.fromComponents(bucket, object);
return uri.toString();
}));
Which PTransform
can be used to start reading/processing each file in the uris PCollection
?
哪个PTransform可用于开始读取/处理uris PCollection中的每个文件?
2 个解决方案
#1
2
Apache Beam at HEAD includes a PTransform that does exactly what you want: TextIO.readAll() reads a PCollection<String>
of filepatterns or filenames. It will be available in Beam 2.2.0, but for now you can just build a snapshot of Beam yourself from the github repo and depend on that.
HEAD中的Apache Beam包含一个完全符合您要求的PTransform:TextIO.readAll()读取文件模式或文件名的PCollection
#2
0
Combing Cloud Storage change notifications with Google Cloud Functions should be a good option (still in beta though).
将云端存储更改通知与Google云端功能相结合应该是一个不错的选择(尽管仍处于测试阶段)。
Using Cloud Functions you can launch a Dataflow job using some Javascript code. This is a very good blogpost that should get you on the way. You Dataflow job will kick-off whenever a new file lands in a bucket or a file changes and will process these files.
使用云功能,您可以使用一些Javascript代码启动Dataflow作业。这是一个非常好的博客文章,应该让你在路上。只要新文件落入存储桶或文件发生更改并处理这些文件,您就会启动数据流作业。
If you want to stick to your approach, you might want to use the Google Cloud Storage Java SDK to read the files in a custom DoFn. Not sure if that approach is preferable though.
如果您想坚持自己的方法,可能需要使用Google Cloud Storage Java SDK来读取自定义DoFn中的文件。不确定这种方法是否更可取。
#1
2
Apache Beam at HEAD includes a PTransform that does exactly what you want: TextIO.readAll() reads a PCollection<String>
of filepatterns or filenames. It will be available in Beam 2.2.0, but for now you can just build a snapshot of Beam yourself from the github repo and depend on that.
HEAD中的Apache Beam包含一个完全符合您要求的PTransform:TextIO.readAll()读取文件模式或文件名的PCollection
#2
0
Combing Cloud Storage change notifications with Google Cloud Functions should be a good option (still in beta though).
将云端存储更改通知与Google云端功能相结合应该是一个不错的选择(尽管仍处于测试阶段)。
Using Cloud Functions you can launch a Dataflow job using some Javascript code. This is a very good blogpost that should get you on the way. You Dataflow job will kick-off whenever a new file lands in a bucket or a file changes and will process these files.
使用云功能,您可以使用一些Javascript代码启动Dataflow作业。这是一个非常好的博客文章,应该让你在路上。只要新文件落入存储桶或文件发生更改并处理这些文件,您就会启动数据流作业。
If you want to stick to your approach, you might want to use the Google Cloud Storage Java SDK to read the files in a custom DoFn. Not sure if that approach is preferable though.
如果您想坚持自己的方法,可能需要使用Google Cloud Storage Java SDK来读取自定义DoFn中的文件。不确定这种方法是否更可取。