如何使用Apache Beam的PubSubIO withIdAttribute从DataFlow中的GCP PubSub中删除重复消息

时间:2022-07-09 15:21:56

I'm currently attempting to use withIdAttribute with PubSubIO to deduplicate messages that come from PubSub (since PubSub only guarantees at least once delivery).

我目前正在尝试使用带有PubSubIO的withIdAttribute来重复删除来自PubSub的消息(因为PubSub仅保证至少一次传递)。

My messages have four fields, label1, label2, timestamp, and value. A value is unique to the two labels at some timestamp. Therefore, I additionally set a uniqueID attribute before writing to PubSub equal to these three values joined as a string.

我的消息有四个字段,label1,label2,timestamp和value。在某个时间戳上,两个标签的值是唯一的。因此,我在写入PubSub之前另外设置了一个uniqueID属性,等于这三个以字符串形式连接的值。

For example, this is what I get from reading from a subscription using the gcp console tool.

例如,这是我使用gcp控制台工具从订阅中读取的内容。

┌───────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────┬───────────────────────────────────────────────────────────────────────────────────────────────────┐
│                                                    DATA                                                   │   MESSAGE_ID   │                                               ATTRIBUTES                                          │
├───────────────────────────────────────────────────────────────────────────────────────────────────────────┼────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ {"label1":"5c381a51-2873-49b8-acf5-60a0fa59fc65","label2":"foobarbaz","timestamp":1513199383,"value":4.2} │ 11185357338249 │ eventTime=2017-12-13T21:09:43Z uniqueID=5c381a51-2873-49b8-acf5-60a0fa59fc65:foobarbaz:1513199383 │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────┘

In my beam job, running on GCP Dataflow, I decode these messages as json, window them, group them by their two labels, and then attempt to aggregate them. However, in my aggregation class CreateMyAggregationsFn I'm seeing duplicate messages that have the same label1, label2, and timestamp.

在我的光束作业中,在GCP Dataflow上运行,我将这些消息解码为json,对它们进行窗口化,按它们的两个标签对它们进行分组,然后尝试聚合它们。但是,在我的聚合类CreateMyAggregationsFn中,我看到具有相同label1,label2和timestamp的重复消息。

public class MyBeam {
  public interface MyBeanOptions extends PipelineOptions {
    // ...
  }

  private static class MyMessage implements Serializable {
    public long timestamp;
    public double value;
    public String label1;
    public String label2;
  }

  private static class CreateMyAggregationsFn extends DoFn<KV<String, Iterable<MyMessage>>, MyAggregate> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      ArrayList<MyMessage> messages = new ArrayList<>();
      c.element().getValue().forEach(messages::add);
      Collections.sort(messages, (msg1, msg2) -> Long.compare(msg1.timestamp, msg2.timestamp));

      MyMessage prev = null
      for (MyMessage msg : messages) {
        if (prev != null &&
            msg.timestamp == prev.timestamp && 
            msg.label1.equals(prev.label1) && 
            msg.label2.equals(prev.label2)) {
            // ... identifying duplicates here
        }
        prev = msg;
      }
      ...
    }
  }

  public static void main(String[] args) throws IOException {
    MyBeamOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyBeamOptions.class);
    Pipeline pipeline = Pipeline.create(options);
    PubsubIO.Read<String> pubsubReadSubscription =
        PubsubIO.readStrings()
            .withTimestampAttribute("eventTime")
            .withIdAttribute("uniqueID")
            .fromSubscription(options.getPubsubSubscription());
    pipeline
        .apply("PubsubReadSubscription", pubsubReadSubscription)
        .apply("ParseJsons", ParseJsons.of(MyMessage.class))
        .setCoder(SerializableCoder.of(MyMessage.class))
        .apply(
            "Window",
            Window.<MyMessage>into(FixedWindows.of(Duration.standardSeconds(60)))
                .triggering(
                    AfterWatermark.pastEndOfWindow()
                        .withLateFirings(AfterPane.elementCountAtLeast(1)))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.standardSeconds(3600)))
        .apply(
            "PairMessagesWithLabels",
            MapElements.into(
                    TypeDescriptors.kvs(
                        TypeDescriptors.strings(), TypeDescriptor.of(MyMessage.class)))
                .via(msg -> KV.of(msg.label1 + ":" + msg.label2, msg)))
        .apply("GroupMessagesByLabels", GroupByKey.<String, MyMessage>create())
        .apply("CreateAggregations", ParDo.of(new CreateMyAggregationsFn()))
        // ...
    PipelineResult result = pipeline.run();
  }
}

Is there an additional step to deduping messages from PubSubIO with the withIdAttribute method that I'm missing?

是否还有一个额外的步骤,使用我缺少的withIdAttribute方法从PubSubIO中删除消息?

1 个解决方案

#1


2  

You are specifying accumulatingFiredPanes(), which means that in case of multiple firings for a window (e.g. if late data arrives) you are asking successive firings to include all the elements from previous firings, not just new elements. This by definition produces duplication. What are you trying to achieve by specifying accumulatingFiredPanes()?

您正在指定accumulatingFiredPanes(),这意味着如果窗口有多次点火(例如,如果迟到的数据到达),您要求连续点火包括之前点火的所有元素,而不仅仅是新元素。根据定义,这会产生重复。你想通过指定accumulatingFiredPanes()来实现什么?

#1


2  

You are specifying accumulatingFiredPanes(), which means that in case of multiple firings for a window (e.g. if late data arrives) you are asking successive firings to include all the elements from previous firings, not just new elements. This by definition produces duplication. What are you trying to achieve by specifying accumulatingFiredPanes()?

您正在指定accumulatingFiredPanes(),这意味着如果窗口有多次点火(例如,如果迟到的数据到达),您要求连续点火包括之前点火的所有元素,而不仅仅是新元素。根据定义,这会产生重复。你想通过指定accumulatingFiredPanes()来实现什么?