Google Dataflow:根据条件仅向其中一个PubSub主题输出消息

时间:2021-04-21 15:35:09

In my pipeline I want to output the messages to one of the PubSub topics based on the result from previous transformation. At the moment I'm sending output to the same topic:

在我的管道中,我想根据先前转换的结果将消息输出到其中一个PubSub主题。目前我正在向同一主题发送输出:

 SearchItemGeneratorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(SearchItemGeneratorOptions.class);
 Pipeline p = Pipeline.create(options);
 p.apply(...)
 //other transformations 
 .apply("ParseFile", new ParseFile()) // outputs PCollection<Message>, where each Message has a MessageType property with the name of the topic.
 .apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));

And this is my Message object:

这是我的Message对象:

class Message {
    private MessageType messageType;
    private String payload;
   //constructor, getters
}

My ParseFile transformer outputs PCollection and each Message object has a property messageType. Based on the messageType property I wanted to output to the different PubSub topics payload property of the Message. I read in this article paragraph Multiple transforms process the same PCollection but still didn't get how I can apply it or other solutions in my case.

我的ParseFile转换器输出PCollection,每个Message对象都有一个属性messageType。基于messageType属性,我想输出到Message的不同PubSub主题有效内容属性。我在本文中读到段落转换处理相同的PCollection,但仍然没有得到如何在我的情况下应用它或其他解决方案。

Update thanks @Andrew for your solution. I solved my issue by using TupleTag but approach is similar. I created two different TupleTag objects in the main pipeline:

更新感谢@Andrew提供的解决方案。我通过使用TupleTag解决了我的问题,但方法类似。我在主管道中创建了两个不同的TupleTag对象:

public static final TupleTag<String> full = new TupleTag<>("full");
public static final TupleTag<String> delta = new TupleTag<>("delta");

And then based on my condition I output the message in the DoFn with correct TupleTag:

然后根据我的条件,我在DoFn中输出正确的TupleTag消息:

TupleTag tupleTag = //assign full or delta TupleTag
processContext.output(tupleTag, jsonObject.toString());

And selected in the main pipeline from PCollectionTuple by each TupleTag to send to the Pub/Sub topics.

并在主要管道中从PCollectionTuple中选择每个TupleTag发送到Pub / Sub主题。

messages.get(full)
            .apply("SendToIndexTopic", PubsubIO.writeStrings().to(options.getOutputIndexTopic()));

messages.get(delta)
            .apply("SendToDeltaTopic", PubsubIO.writeStrings().to(options.getOutputDeltaTopic()));

The only thing to mention is that my TupleTag objects are static objects.

唯一要提到的是我的TupleTag对象是静态对象。

1 个解决方案

#1


2  

You can partition your pipeline to publish messages to multiple Pub/Sub topics. Partitioning will allow you to separate messages and not duplicate them to different Pub/Sub topics. You'll need to know all of the Pub/Sub topics ahead of time. Reference: Partition.

您可以对管道进行分区,以将消息发布到多个Pub / Sub主题。分区将允许您分隔消息,而不是将它们复制到不同的Pub / Sub主题。您需要提前了解所有Pub / Sub主题。参考:分区。

Example:

例:

// partition pipeline

PCollectionList<Message> msgs = p.apply(Partition.of(2, new PartitionFn<Message>() {
    public int partitionFor(Message msg, int numPartitions) {
        // TODO: determine how to partition messages
        if (msg.messageType == "x") {
            return 0;
        } else {
            return 1;
        }
    }
}));

// access partitions

PCollection<Message> partition1 = msgs.get(0);
partition1.apply("WriteItemsToTopic1", PubsubIO.writeStrings().to(options.getOutputTopic1()));

PCollection<Message> partition2 = msgs.get(1);
partition2.apply("WriteItemsToTopic2", PubsubIO.writeStrings().to(options.getOutputTopic2()));

#1


2  

You can partition your pipeline to publish messages to multiple Pub/Sub topics. Partitioning will allow you to separate messages and not duplicate them to different Pub/Sub topics. You'll need to know all of the Pub/Sub topics ahead of time. Reference: Partition.

您可以对管道进行分区,以将消息发布到多个Pub / Sub主题。分区将允许您分隔消息,而不是将它们复制到不同的Pub / Sub主题。您需要提前了解所有Pub / Sub主题。参考:分区。

Example:

例:

// partition pipeline

PCollectionList<Message> msgs = p.apply(Partition.of(2, new PartitionFn<Message>() {
    public int partitionFor(Message msg, int numPartitions) {
        // TODO: determine how to partition messages
        if (msg.messageType == "x") {
            return 0;
        } else {
            return 1;
        }
    }
}));

// access partitions

PCollection<Message> partition1 = msgs.get(0);
partition1.apply("WriteItemsToTopic1", PubsubIO.writeStrings().to(options.getOutputTopic1()));

PCollection<Message> partition2 = msgs.get(1);
partition2.apply("WriteItemsToTopic2", PubsubIO.writeStrings().to(options.getOutputTopic2()));