I have multiple subscriptions from Cloud PubSub to read based on certain prefix pattern using Apache Beam. I extend PTransform
class and implement expand()
method to read from multiple subscriptions and do Flatten
transformation to the PCollectionList
(multiple PCollection
on from each subscription
). I have a problem to pass subscription prefix as ValueProvider
into the expand()
method, since expand()
is called on template creation time, not when launching the job. However, if I only use 1 subscription, I can pass ValueProvider
into PubsubIO.readStrings().fromSubscription()
.
我有多个来自Cloud PubSub的订阅,可以使用Apache Beam基于某些前缀模式进行读取。我扩展了PTransform类并实现了expand()方法来从多个订阅中读取,并将Flatten转换为PCollectionList(每个订阅的多个PCollection)。我有一个问题是将订阅前缀作为ValueProvider传递到expand()方法,因为在模板创建时调用expand(),而不是在启动作业时调用。但是,如果我只使用1个订阅,我可以将ValueProvider传递给PubsubIO.readStrings()。fromSubscription()。
Here's some sample code.
这是一些示例代码。
public class MultiPubSubIO extends PTransform<PBegin, PCollection<PubsubMessage>> {
private ValueProvider<String> prefixPubsub;
public MultiPubSubIO(@Nullable String name, ValueProvider<String> prefixPubsub) {
super(name);
this.prefixPubsub = prefixPubsub;
}
@Override
public PCollection<PubsubMessage> expand(PBegin input) {
List<String> myList = null;
try {
// prefixPubsub.get() will return error
myList = PubsubHelper.getAllSubscription("projectID", prefixPubsub.get());
} catch (Exception e) {
LogHelper.error(String.format("Error getting list of subscription : %s",e.toString()));
}
List<PCollection<PubsubMessage>> collectionList = new ArrayList<PCollection<PubsubMessage>>();
if(myList != null && !myList.isEmpty()){
for(String subs : myList){
PCollection<PubsubMessage> pCollection = input
.apply("ReadPubSub", PubsubIO.readMessagesWithAttributes().fromSubscription(this.prefixPubsub));
collectionList.add(pCollection);
}
PCollection<PubsubMessage> pubsubMessagePCollection = PCollectionList.of(collectionList)
.apply("FlattenPcollections", Flatten.pCollections());
return pubsubMessagePCollection;
} else {
LogHelper.error(String.format("No subscription with prefix %s found", prefixPubsub));
return null;
}
}
public static MultiPubSubIO read(ValueProvider<String> prefixPubsub){
return new MultiPubSubIO(null, prefixPubsub);
}
}
So I'm thinking of how to use the same way PubsubIO.read().fromSubscription()
to read from ValueProvider
. Or am I missing something?
所以我在考虑如何使用PubsubIO.read()。fromSubscription()从ValueProvider中读取相同的方法。或者我错过了什么?
Searched links:
搜索链接:
- extract-value-from-valueprovider-in-apache-beam - Answer talked about using DoFn, while I need PTransform that receives PBegin.
- extract-value-from-valueprovider-in-apache-beam - 回答谈到使用DoFn,而我需要接收PBegin的PTransform。
1 个解决方案
#1
1
Unfortunately this is not possible currently:
不幸的是,目前这是不可能的:
-
It is not possible for the value of a
ValueProvider
to affect transform expansion - at expansion time, it is unknown; by the time it is known, the pipeline shape is already fixed.ValueProvider的值不可能影响转换扩展 - 在扩展时,它是未知的;到了解时,管道形状已经固定。
-
There is currently no transform like
PubsubIO.read()
that can accept aPCollection
of topic names. Eventually there will be (it is enabled by [http://s.apache.org/splittable-do-fn](Splittable DoFn)), but it will take a while - nobody is working on this currently.目前没有像PubsubIO.read()这样的转换可以接受主题名称的PCollection。最终会有(它由[http://s.apache.org/splittable-do-fn](Splittable DoFn)启用),但它需要一段时间 - 目前没有人正在研究这个问题。
#1
1
Unfortunately this is not possible currently:
不幸的是,目前这是不可能的:
-
It is not possible for the value of a
ValueProvider
to affect transform expansion - at expansion time, it is unknown; by the time it is known, the pipeline shape is already fixed.ValueProvider的值不可能影响转换扩展 - 在扩展时,它是未知的;到了解时,管道形状已经固定。
-
There is currently no transform like
PubsubIO.read()
that can accept aPCollection
of topic names. Eventually there will be (it is enabled by [http://s.apache.org/splittable-do-fn](Splittable DoFn)), but it will take a while - nobody is working on this currently.目前没有像PubsubIO.read()这样的转换可以接受主题名称的PCollection。最终会有(它由[http://s.apache.org/splittable-do-fn](Splittable DoFn)启用),但它需要一段时间 - 目前没有人正在研究这个问题。