数据流批量与流式传输:窗口大小大于批量大小

时间:2021-08-16 15:07:06

Let's say we have log data with timestamps that can either be streamed into BigQuery or stored as files in Google Storage, but not streamed directly to the unbounded collection source types that Dataflow supports.

假设我们有带时间戳的日志数据,这些时间戳既可以流式传输到BigQuery,也可以存储为Google Storage中的文件,但不能直接流式传输到Dataflow支持的*集合源类型。

We want to analyse this data based on timestamp, either relatively or absolutely, e.g. "how many hits in the last 1 hour?" and "how many hits between 3pm and 4pm on 5th Feb 2018"?

我们希望基于时间戳分析这些数据,无论是相对还是绝对,例如“过去1小时有多少次点击?”和“2018年2月5日下午3点到4点之间有多少点击”?

Having read the documentation on windows and triggers, it's not clear how we would divide our incoming data into batches in a way that is supported by Dataflow if we want to have a large window - potentially we want to aggregate over the last day, 30 days, 3 months, etc.

在阅读了有关Windows和触发器的文档之后,如果我们想拥有一个大窗口,我们不清楚如何以数据流支持的方式将传入数据分成批次 - 我们可能希望在最后一天汇总30天,3个月等

For example, if our batched source is a BigQuery query, run every 5 mins, for the last 5 mins worth of data, will Dataflow keep the windows open between job runs, even though the data is arriving in 5 min chunks?

例如,如果我们的批处理源是一个BigQuery查询,每隔5分钟运行一次,对于最后5分钟的数据,Dataflow会在作业运行之间保持打开窗口,即使数据是以5分钟的时间到达的吗?

Similarly, if the log files are rotated every 5 mins, and we start Dataflow as a new file is saved to the bucket, the same question applies - is the job stopped and started, and all knowledge of previous jobs discarded, or does the large window (e.g. up to a month) remain open for new events?

类似地,如果日志文件每隔5分钟轮换一次,并且我们启动Dataflow作为新文件保存到存储桶,则同样的问题适用 - 作业已停止并启动,并且所有先前作业的知识都被丢弃,或者是大的窗口(例如长达一个月)对新事件保持开放状态?

How do we change/modify this pipeline without disturbing the existing state?

我们如何在不打扰现有状态的情况下更改/修改此管道?

Apologies if these are basic questions, even a link to some docs would be appreciated.

如果这些是基本问题,请致歉,即使是某些文档的链接也会受到赞赏。

1 个解决方案

#1


3  

It sounds like you want arbitrary interactive aggregation queries on your data. Beam / Dataflow are not a good fit for this per se, however one of the most common use cases of Dataflow is to ingest data into BigQuery (e.g. from GCS files or from Pubsub), which is a very good fit for that.

听起来您希望对数据进行任意交互式聚合查询。 Beam / Dataflow本身并不适合这种情况,但Dataflow最常见的用例之一是将数据摄取到BigQuery中(例如从GCS文件或Pubsub中),这非常适合。

A few more comments on your question:

还有一些关于你的问题的评论:

it's not clear how we would divide our incoming data into batches

目前尚不清楚我们如何将我们的传入数据分成几批

Windowing in Beam is simply a way to specify the aggregation scope in the time dimension. E.g. if you're using sliding windows of size 15 minutes every 5 minutes, then a record whose event-time timestamp is 14:03 counts towards aggregations in three windows: 13:50..14:05, 13:55..14:10, 14:00..14:15.

窗口中的窗口只是一种在时间维度中指定聚合范围的方法。例如。如果你每隔5分钟使用15分钟大小的滑动窗口,那么事件时间戳为14:03的记录会计入三个窗口的聚合:13:50..14:05,13:55..14: 10,14:00..14:15。

So: same way as you don't need to divide your incoming data into "keys" when grouping by a key (the data processing framework performs the group-by-key for you), you don't divide it into windows either (the framework performs group-by-window implicitly as part of every aggregating operation).

所以:同样的方式,当您按键分组(数据处理框架为您执行分组)时,您不需要将传入的数据划分为“键”,也不要将其划分为窗口(框架作为每个聚合操作的一部分隐式执行逐个窗口。

will Dataflow keep the windows open between job runs

Dataflow会在作业运行之间保持打开窗口

I'm hoping this is addressed by the previous point, but to clarify more: No. Stopping a Dataflow job discards all of its state. However, you can "update" a job with new code (e.g. if you've fixed a bug or added an extra processing step) - in that case state is not discarded, but I think that's not what you're asking.

我希望这可以通过前一点解决,但要澄清更多:否。停止Dataflow作业会丢弃其所有状态。但是,您可以使用新代码“更新”作业(例如,如果您已修复错误或添加了额外的处理步骤) - 在这种情况下,状态不会被丢弃,但我认为这不是您所要求的。

if the log files are rotated every 5 mins, and we start Dataflow as a new file is saved

如果日志文件每隔5分钟旋转一次,我们就会在保存新文件时启动Dataflow

It sounds like you want to ingest data continuously. The way to do that is to write a single continuously running streaming pipeline that ingests the data continuously, rather than to start a new pipeline every time new data arrives. In the case of files arriving into a bucket, you can use TextIO.read().watchForNewFiles() if you're reading text files, or its various analogues if you're reading some other kind of files (most general is FileIO.matchAll().continuously()).

听起来你想连续摄取数据。这样做的方法是编写一个连续运行的流输出管道,连续摄取数据,而不是每次新数据到达时启动新的管道。在文件到达存储桶的情况下,如果您正在读取文本文件,则可以使用TextIO.read()。watchForNewFiles();如果您正在读取其他类型的文件,则可以使用其各种类似物(最常见的是FileIO。 matchAll()。连续地())。

#1


3  

It sounds like you want arbitrary interactive aggregation queries on your data. Beam / Dataflow are not a good fit for this per se, however one of the most common use cases of Dataflow is to ingest data into BigQuery (e.g. from GCS files or from Pubsub), which is a very good fit for that.

听起来您希望对数据进行任意交互式聚合查询。 Beam / Dataflow本身并不适合这种情况,但Dataflow最常见的用例之一是将数据摄取到BigQuery中(例如从GCS文件或Pubsub中),这非常适合。

A few more comments on your question:

还有一些关于你的问题的评论:

it's not clear how we would divide our incoming data into batches

目前尚不清楚我们如何将我们的传入数据分成几批

Windowing in Beam is simply a way to specify the aggregation scope in the time dimension. E.g. if you're using sliding windows of size 15 minutes every 5 minutes, then a record whose event-time timestamp is 14:03 counts towards aggregations in three windows: 13:50..14:05, 13:55..14:10, 14:00..14:15.

窗口中的窗口只是一种在时间维度中指定聚合范围的方法。例如。如果你每隔5分钟使用15分钟大小的滑动窗口,那么事件时间戳为14:03的记录会计入三个窗口的聚合:13:50..14:05,13:55..14: 10,14:00..14:15。

So: same way as you don't need to divide your incoming data into "keys" when grouping by a key (the data processing framework performs the group-by-key for you), you don't divide it into windows either (the framework performs group-by-window implicitly as part of every aggregating operation).

所以:同样的方式,当您按键分组(数据处理框架为您执行分组)时,您不需要将传入的数据划分为“键”,也不要将其划分为窗口(框架作为每个聚合操作的一部分隐式执行逐个窗口。

will Dataflow keep the windows open between job runs

Dataflow会在作业运行之间保持打开窗口

I'm hoping this is addressed by the previous point, but to clarify more: No. Stopping a Dataflow job discards all of its state. However, you can "update" a job with new code (e.g. if you've fixed a bug or added an extra processing step) - in that case state is not discarded, but I think that's not what you're asking.

我希望这可以通过前一点解决,但要澄清更多:否。停止Dataflow作业会丢弃其所有状态。但是,您可以使用新代码“更新”作业(例如,如果您已修复错误或添加了额外的处理步骤) - 在这种情况下,状态不会被丢弃,但我认为这不是您所要求的。

if the log files are rotated every 5 mins, and we start Dataflow as a new file is saved

如果日志文件每隔5分钟旋转一次,我们就会在保存新文件时启动Dataflow

It sounds like you want to ingest data continuously. The way to do that is to write a single continuously running streaming pipeline that ingests the data continuously, rather than to start a new pipeline every time new data arrives. In the case of files arriving into a bucket, you can use TextIO.read().watchForNewFiles() if you're reading text files, or its various analogues if you're reading some other kind of files (most general is FileIO.matchAll().continuously()).

听起来你想连续摄取数据。这样做的方法是编写一个连续运行的流输出管道,连续摄取数据,而不是每次新数据到达时启动新的管道。在文件到达存储桶的情况下,如果您正在读取文本文件,则可以使用TextIO.read()。watchForNewFiles();如果您正在读取其他类型的文件,则可以使用其各种类似物(最常见的是FileIO。 matchAll()。连续地())。