是否可以根据窗口元素的时间戳动态生成BigQuery表名?

时间:2022-12-12 15:23:31

For example, if I have a Dataflow streaming job with 5 minutes window that reads from PubSub, I understand that if I assign a two days past timestamp to an element, there will be a window with this element, and if I use the example that outputs daily tables to BigQuery described in BigQueryIO.java, the job will write the two days past element in a BigQuery table with the actual date.

例如,如果我有一个5分钟窗口的数据流流媒体作业从PubSub读取,我理解如果我将一个超过两天的时间戳分配给一个元素,将会有一个带有此元素的窗口,如果我使用的示例将每日表输出到BigQueryIO.java中描述的BigQuery,该作业将使用实际日期在BigQuery表中写入过去两天的元素。

I would like to write past elements to BigQuery tables with the timestamp of the elements of the window instead of the time of the current window, is it possible?

我想将过去的元素写入BigQuery表中,并使用窗口元素的时间戳而不是当前窗口的时间,是否可能?

Now I'm following the example described in DataflowJavaSDK/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java:

现在我按照DataflowJavaSDK / sdk / src / main / java / com / google / cloud / dataflow / sdk / io / BigQueryIO.java中描述的示例:

    PCollection<TableRow> quotes = ...
    quotes.apply(Window.<TableRow>info(CalendarWindows.days(1)))
       .apply(BigQueryIO.Write
         .named("Write")
         .withSchema(schema)
         .to(new SerializableFunction<BoundedWindow, String>() {
               public String apply(BoundedWindow window) {
                 String dayString = DateTimeFormat.forPattern("yyyy_MM_dd").parseDateTime(
                   ((DaysWindow) window).getStartDate());
                 return "my-project:output.output_table_" + dayString;
               }
             }));

2 个解决方案

#1


5  

If I understand correctly, you would like to make sure that BigQuery tables are created according to inherent timestamps of the elements (quotes), rather than wall-clock time when your pipeline runs.

如果我理解正确,您希望确保根据元素(引号)的固有时间戳创建BigQuery表,而不是管道运行时的挂钟时间。

TL;DR the code should already do what you want; if it's not, please post more details.

TL; DR代码应该已经做你想要的了;如果不是,请发布更多详情。

Longer explanation: One of the key innovations in processing in Dataflow is event-time processing. This means that data processing in Dataflow is almost completely decoupled from when the processing happens - what matters is when the events being processed happened. This is a key element of enabling exactly the same code to run on batch or streaming data sources (e.g. processing real-time user click events using the same code that processes historical click logs). It also enables flexible handling of late-arriving data.

更长的解释:Dataflow中处理的关键创新之一是事件时间处理。这意味着Dataflow中的数据处理几乎完全与处理发生时分离 - 重要的是处理事件发生的时间。这是在批处理或流数据源上运行完全相同的代码的关键要素(例如,使用处理历史点击日志的相同代码处理实时用户点击事件)。它还可以灵活处理迟到的数据。

Please see The world beyond batch, the section "Event time vs. processing time" for a description of this aspect of Dataflow's processing model (the whole article is very much worth a read). For a deeper description, see the VLDB paper. This is also described in a more user-facing way in the official documentation on windowing and triggers.

请参阅批处理世界,“事件时间与处理时间”部分,以了解Dataflow处理模型的这一方面(整篇文章非常值得一读)。有关更深入的说明,请参阅VLDB文件。在窗口和触发器的官方文档中,这也以面向用户的方式进行了描述。

Accordingly, there is no such thing as a "current window" because the pipeline may be concurrently processing many different events that happened at different times and belong to different windows. In fact, as the VLDB paper points out, one of the important parts of the execution of a Dataflow pipeline is "group elements by window".

因此,不存在诸如“当前窗口”之类的东西,因为管道可以同时处理在不同时间发生并且属于不同窗口的许多不同事件。事实上,正如VLDB论文所指出的,Dataflow管道执行的一个重要部分是“按窗口分组元素”。

In the pipeline you showed, we will group the records you want to write to BigQuery into windows using provided timestamps on the records, and write each window to its own table, creating the table for newly encountered windows if necessary. If late data arrives into the window (see documentation on windowing and triggers for a discussion of late data), we will append to the already existing table.

在您展示的管道中,我们将使用记录上提供的时间戳将要写入BigQuery的记录分组到窗口中,并将每个窗口写入其自己的表,如有必要,为新遇到的窗口创建表。如果后期数据到达窗口(请参阅关于窗口的文档和触发器以讨论后期数据),我们将附加到现有的表中。

#2


1  

The abovementioned code did not work for me anymore. There is an updated example in the Google docs though where DaysWindow is replaced by IntervalWindow which worked for me:

上述代码不再适用于我。 Google文档中有一个更新的示例,但DaysWindow被IntervalWindow取代,后者对我有用:

 PCollection<TableRow> quotes = ...
 quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
   .apply(BigQueryIO.Write
     .named("Write")
     .withSchema(schema)
     .to(new SerializableFunction<BoundedWindow, String>() {
       public String apply(BoundedWindow window) {
         // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
         String dayString = DateTimeFormat.forPattern("yyyy_MM_dd")
              .withZone(DateTimeZone.UTC)
              .print(((IntervalWindow) window).start());
         return "my-project:output.output_table_" + dayString;
       }
     }));

#1


5  

If I understand correctly, you would like to make sure that BigQuery tables are created according to inherent timestamps of the elements (quotes), rather than wall-clock time when your pipeline runs.

如果我理解正确,您希望确保根据元素(引号)的固有时间戳创建BigQuery表,而不是管道运行时的挂钟时间。

TL;DR the code should already do what you want; if it's not, please post more details.

TL; DR代码应该已经做你想要的了;如果不是,请发布更多详情。

Longer explanation: One of the key innovations in processing in Dataflow is event-time processing. This means that data processing in Dataflow is almost completely decoupled from when the processing happens - what matters is when the events being processed happened. This is a key element of enabling exactly the same code to run on batch or streaming data sources (e.g. processing real-time user click events using the same code that processes historical click logs). It also enables flexible handling of late-arriving data.

更长的解释:Dataflow中处理的关键创新之一是事件时间处理。这意味着Dataflow中的数据处理几乎完全与处理发生时分离 - 重要的是处理事件发生的时间。这是在批处理或流数据源上运行完全相同的代码的关键要素(例如,使用处理历史点击日志的相同代码处理实时用户点击事件)。它还可以灵活处理迟到的数据。

Please see The world beyond batch, the section "Event time vs. processing time" for a description of this aspect of Dataflow's processing model (the whole article is very much worth a read). For a deeper description, see the VLDB paper. This is also described in a more user-facing way in the official documentation on windowing and triggers.

请参阅批处理世界,“事件时间与处理时间”部分,以了解Dataflow处理模型的这一方面(整篇文章非常值得一读)。有关更深入的说明,请参阅VLDB文件。在窗口和触发器的官方文档中,这也以面向用户的方式进行了描述。

Accordingly, there is no such thing as a "current window" because the pipeline may be concurrently processing many different events that happened at different times and belong to different windows. In fact, as the VLDB paper points out, one of the important parts of the execution of a Dataflow pipeline is "group elements by window".

因此,不存在诸如“当前窗口”之类的东西,因为管道可以同时处理在不同时间发生并且属于不同窗口的许多不同事件。事实上,正如VLDB论文所指出的,Dataflow管道执行的一个重要部分是“按窗口分组元素”。

In the pipeline you showed, we will group the records you want to write to BigQuery into windows using provided timestamps on the records, and write each window to its own table, creating the table for newly encountered windows if necessary. If late data arrives into the window (see documentation on windowing and triggers for a discussion of late data), we will append to the already existing table.

在您展示的管道中,我们将使用记录上提供的时间戳将要写入BigQuery的记录分组到窗口中,并将每个窗口写入其自己的表,如有必要,为新遇到的窗口创建表。如果后期数据到达窗口(请参阅关于窗口的文档和触发器以讨论后期数据),我们将附加到现有的表中。

#2


1  

The abovementioned code did not work for me anymore. There is an updated example in the Google docs though where DaysWindow is replaced by IntervalWindow which worked for me:

上述代码不再适用于我。 Google文档中有一个更新的示例,但DaysWindow被IntervalWindow取代,后者对我有用:

 PCollection<TableRow> quotes = ...
 quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
   .apply(BigQueryIO.Write
     .named("Write")
     .withSchema(schema)
     .to(new SerializableFunction<BoundedWindow, String>() {
       public String apply(BoundedWindow window) {
         // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
         String dayString = DateTimeFormat.forPattern("yyyy_MM_dd")
              .withZone(DateTimeZone.UTC)
              .print(((IntervalWindow) window).start());
         return "my-project:output.output_table_" + dayString;
       }
     }));