Google dataflow 2.0 pubsub处理后期数据

时间:2021-02-23 15:20:33

I have an issue regarding goolge dataflow.

我有关于goolge数据流的问题。

I'm writing a dataflow pipeline which reads data from PubSub, and write to BigQuery, it's works.
Now I have to handle late data and i was following some examples on intenet but it's not working properly, here is my code:

我正在编写一个从PubSub读取数据的数据流管道,并写入BigQuery,它的工作原理。现在我必须处理后期数据,我正在关注intenet上的一些例子,但它不能正常工作,这是我的代码:

pipeline.apply(PubsubIO.readStrings()
            .withTimestampAttribute("timestamp").fromSubscription(Constants.SUBSCRIBER))
        .apply(ParDo.of(new ParseEventFn()))        
        .apply(Window.<Entity> into(FixedWindows.of(WINDOW_SIZE))
            // processing of late data.
            .triggering(
                    AfterWatermark
                            .pastEndOfWindow()
                            .withEarlyFirings(
                                    AfterProcessingTime
                                            .pastFirstElementInPane()
                                            .plusDelayOf(DELAY_SIZE))
                            .withLateFirings(AfterPane.elementCountAtLeast(1)))
            .withAllowedLateness(ALLOW_LATE_SIZE)
            .accumulatingFiredPanes())
        .apply(ParDo.of(new ParseTableRow()))
        .apply("Write to BQ", BigQueryIO.<TableRow>write()...

Here is my pubsub message:

这是我的pubsub消息:

{
...,
"timestamp" : "2015-08-31T09:52:25.005Z"
}

When I manually push some messages(go to PupsubTopic and publish) with timestamp is << ALLOW_LATE_SIZE but these messages are still passed.

当我手动推送一些消息(转到PupsubTopic并发布)时,时间戳为<< ALLOW_LATE_SIZE但仍传递这些消息。

1 个解决方案

#1


0  

You should specify the allowed lateness formally using the "Duration" object as: .withAllowedLateness(Duration.standardMinutes(ALLOW_LATE_SIZE)), assuming you have set the value of ALLOW_LATE_SIZE in minutes.

您应该正式使用“持续时间”对象指定允许的延迟:.withAllowedLateness(Duration.standardMinutes(ALLOW_LATE_SIZE)),假设您已在几分钟内设置了ALLOW_LATE_SIZE的值。

You may check the documentation page for "Google Cloud Dataflow SDK for Java", specifically the "Triggers" sub-chapter.

您可以查看“Google Cloud Dataflow SDK for Java”的文档页面,特别是“触发器”子章节。

#1


0  

You should specify the allowed lateness formally using the "Duration" object as: .withAllowedLateness(Duration.standardMinutes(ALLOW_LATE_SIZE)), assuming you have set the value of ALLOW_LATE_SIZE in minutes.

您应该正式使用“持续时间”对象指定允许的延迟:.withAllowedLateness(Duration.standardMinutes(ALLOW_LATE_SIZE)),假设您已在几分钟内设置了ALLOW_LATE_SIZE的值。

You may check the documentation page for "Google Cloud Dataflow SDK for Java", specifically the "Triggers" sub-chapter.

您可以查看“Google Cloud Dataflow SDK for Java”的文档页面,特别是“触发器”子章节。