Apache Beam - 在两个*PCollections上按键加入流

时间:2021-11-23 14:41:38

am having two Unbounded(KafkaIO) PCollections for which am applying tag based CoGroupByKey with a fixed window of 1 min, however at the time of joining most of the time the collection seem to miss one of the tagged data for some test data having same keys. Please find the below snippet.

我有两个Unbounded(KafkaIO)PCollections正在应用基于标签的CoGroupByKey,固定窗口为1分钟,但是在大多数情况下加入时,集合似乎错过了一些具有相同键的测试数据的标记数据。请找到以下代码段。

    KafkaIO.Read<Integer, String> event1 = ... ;


    KafkaIO.Read<Integer, String> event2 = ...;

    PCollection<KV<String,String>> event1Data = p.apply(event1.withoutMetadata())
            .apply(Values.<String>create())
            .apply(MapElements.via(new SimpleFunction<String, KV<String, String>>() {
                @Override public KV<String, String> apply(String input) {
                    log.info("Extracting Data");
                    . . . .//Some processing
                    return KV.of(record.get("myKey"), record.get("myValue"));
                }
            }))
            .apply(Window.<KV<String,String>>into(
                    FixedWindows.of(Duration.standardMinutes(1))));

    PCollection<KV<String,String>> event2Data = p.apply(event2.withoutMetadata())
            .apply(Values.<String>create())
            .apply(MapElements.via(new SimpleFunction<String, KV<String, String>>() {
                @Override public KV<String, String> apply(String input) {
                    log.info("Extracting Data");
                    . . . .//Some processing
                    return KV.of(record.get("myKey"), record.get("myValue"));
                }
            }))
            .apply(Window.<KV<String,String>>into(
                    FixedWindows.of(Duration.standardMinutes(1))));

   final TupleTag<String> event1Tag = new TupleTag<>();
   final TupleTag<String> event2Tag = new TupleTag<>();

   PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
            .of(event1Tag, event1Data)
            .and(event2Tag, event2Data)
            .apply(CoGroupByKey.<String>create());

   PCollection<String> finalResultCollection =
            kvpCollection.apply("Join", ParDo.of(
                    new DoFn<KV<String, CoGbkResult>, String>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) throws IOException {
                            KV<String, CoGbkResult> e = c.element();
                            Iterable<String> event1Values = e.getValue().getAll(event1Tag);
                            Iterable<String> event2Values = e.getValue().getAll(event2Tag);
                            if( event1.iterator().hasNext() && event2.iterator().hasNext() ){
                               // Process event1 and event2 data and write to c.output
                            }else {
                                System.out.println("Unable to join event1 and event2");
                            }
                        }
                    }));

For the above code when I start pumping data with a common key for the two kafka topics, its never getting joined i.e Unable to join event1 and event2, kindly let me know if am doing anything wrong or is there a better way to join two unbounded PCollection on a common key.

对于上面的代码,当我开始用两个kafka主题的公共密钥抽取数据时,它永远不会加入即无法加入event1和event2,如果我做错了或者有更好的方法加入两个*,请告诉我PCollection在公共密钥上。

1 个解决方案

#1


0  

I guess I somewhat figured out the issue, the default trigger was getting triggered for the two Unbounded sources at CoGroupByKey hence as and when there was a new event arriving at the two sources it was trying to apply join operation immediately, as there were no Data Driven Triggers configured for my steam join pipeline. I configured the required triggering() discardingFiredPanes() withAllowedLateness() properties to my Window function which solved my stream join usecase.

我想我有点想出了这个问题,默认触发器是在CoGroupByKey上触发了两个Unbounded源,因此当有一个新事件到达这两个源时它正在尝试立即应用join操作,因为没有数据为我的蒸汽连接管道配置的驱动触发器。我将所需的triggering()discardingFiredPanes()withAllowedLateness()属性配置到我的Window函数,该函数解决了我的流连接用例。

#1


0  

I guess I somewhat figured out the issue, the default trigger was getting triggered for the two Unbounded sources at CoGroupByKey hence as and when there was a new event arriving at the two sources it was trying to apply join operation immediately, as there were no Data Driven Triggers configured for my steam join pipeline. I configured the required triggering() discardingFiredPanes() withAllowedLateness() properties to my Window function which solved my stream join usecase.

我想我有点想出了这个问题,默认触发器是在CoGroupByKey上触发了两个Unbounded源,因此当有一个新事件到达这两个源时它正在尝试立即应用join操作,因为没有数据为我的蒸汽连接管道配置的驱动触发器。我将所需的triggering()discardingFiredPanes()withAllowedLateness()属性配置到我的Window函数,该函数解决了我的流连接用例。