根据时间戳过滤Dataflow中的有界数据

时间:2022-05-28 15:34:59

In my dataflow pipeline, I'll have two PCollections<TableRow> that have been read from BigQuery tables. I plan to merge those two PCollections into one PCollection with with a flatten.

在我的数据流管道中,我将从BigQuery表中读取两个PCollections 。我计划将这两个PCollections合并到一个PCollection中并使用flatten。

Since BigQuery is append only, the goal is to write truncate the second table in BigQuery with the a new PCollection.

由于BigQuery只是追加,因此目标是使用新的PCollection在BigQuery中截断第二个表。

I've read through the documentation and it's the middle steps I'm confused about. With my new PCollection the plan is to use a Comparator DoFn to look at the max last update date and returning the given row. I'm unsure if I should be using a filter transform or if I should be doing a Group by key and then using a filter?

我已经阅读了文档,这是我困惑的中间步骤。使用我的新PCollection,计划是使用Comparator DoFn查看最后的最后更新日期并返回给定的行。我不确定我是否应该使用过滤器转换,或者我是否应该按键进行分组,然后使用过滤器?

All PCollection<TableRow>s will contain the same values: IE: string, integer and timestamp. When it comes to key value pairs, most of the documentation on cloud dataflow includes just simple strings. Is it possible to have a key value pair that is the entire row of the PCollection<TableRow>?

所有PCollection 都将包含相同的值:IE:字符串,整数和时间戳。对于键值对,云数据流的大多数文档都包含简单的字符串。是否可以拥有一个键值对,它是PCollection 的整行?

The rows would look similar to:

行看起来类似于:

customerID, customerName, lastUpdateDate
0001, customerOne, 2016-06-01 00:00:00
0001, customerOne, 2016-06-11 00:00:00

In the example above, I would want to filter the PCollection to just return the second row to a PCollection that would be written to BigQuery. Also, is it possible to apply these Pardo's on the third PCollection without creating a fourth?

在上面的例子中,我想过滤PCollection只是将第二行返回到将写入BigQuery的PCollection。另外,是否有可能在第三次PCollection上应用这些Pardo而不创建第四个?

1 个解决方案

#1


1  

You've asked a few questions. I have tried to answer them in isolation, but I may have misunderstood the whole scenario. If you provided some example code, it might help to clarify.

你问了几个问题。我试图孤立地回答它们,但我可能误解了整个场景。如果您提供了一些示例代码,则可能有助于澄清。

With my new PCollection the plan is to use a Comparator DoFn to look at the max last update date and returning the given row. I'm unsure if I should be using a filter transform or if I should be doing a Group by key and then using a filter?

使用我的新PCollection,计划是使用Comparator DoFn查看最后的最后更新日期并返回给定的行。我不确定我是否应该使用过滤器转换,或者我是否应该按键进行分组,然后使用过滤器?

Based on your description, it seems that you want to take a PCollection of elements and for each customerID (the key) find the most recent update to that customer's record. You can use the provided transforms to accomplish this via Top.largestPerKey(1, timestampComparator) where you set up your timestampComparator to look only at the timestamp.

根据您的描述,您似乎想要对元素进行PCollection,并为每个customerID(密钥)找到该客户记录的最新更新。您可以使用提供的转换通过Top.largestPerKey(1,timestampComparator)完成此操作,您可以在其中设置timestampComparator以仅查看时间戳。

Is it possible to have a key value pair that is the entire row of the PCollection?

是否可以拥有一个键值对,即PCollection的整行?

A KV<K, V> can have any type for the key (K) and value (V). If you want to group by key, then the coder for the keys needs to be deterministic. TableRowJsonCoder is not deterministic, because it may contain arbitrary objects. But it sounds like you want to have the customerID for the key and the entire TableRow for the value.

KV 可以具有键(K)和值(V)的任何类型。如果您想按键分组,那么键的编码器需要是确定性的。 TableRowJsonCoder不是确定性的,因为它可能包含任意对象。但听起来你想拥有key的customerID和值的整个TableRow。 ,v>

is it possible to apply these Pardo's on the third PCollection without creating a fourth?

是否可以在第三次PCollection上应用这些Pardo而不创建第四个?

When you apply a PTransform to a PCollection, it results in a new PCollection. There is no way around that, and you don't need to try to minimize the number of PCollections in your pipeline.

将PTransform应用于PCollection时,会产生新的PCollection。没有办法解决这个问题,您无需尝试最小化管道中的PCollections数量。

A PCollection is a conceptual object; it does not have intrinsic cost. Your pipeline is going to be heavily optimized so that many intermediate PCollections - especially those in a sequence of ParDo transforms - will never be materialized anyhow.

PCollection是一个概念对象;它没有内在成本。您的管道将进行大量优化,以便许多中间PCollection - 尤其是ParDo转换序列中的那些 - 将无论如何都不会实现。

#1


1  

You've asked a few questions. I have tried to answer them in isolation, but I may have misunderstood the whole scenario. If you provided some example code, it might help to clarify.

你问了几个问题。我试图孤立地回答它们,但我可能误解了整个场景。如果您提供了一些示例代码,则可能有助于澄清。

With my new PCollection the plan is to use a Comparator DoFn to look at the max last update date and returning the given row. I'm unsure if I should be using a filter transform or if I should be doing a Group by key and then using a filter?

使用我的新PCollection,计划是使用Comparator DoFn查看最后的最后更新日期并返回给定的行。我不确定我是否应该使用过滤器转换,或者我是否应该按键进行分组,然后使用过滤器?

Based on your description, it seems that you want to take a PCollection of elements and for each customerID (the key) find the most recent update to that customer's record. You can use the provided transforms to accomplish this via Top.largestPerKey(1, timestampComparator) where you set up your timestampComparator to look only at the timestamp.

根据您的描述,您似乎想要对元素进行PCollection,并为每个customerID(密钥)找到该客户记录的最新更新。您可以使用提供的转换通过Top.largestPerKey(1,timestampComparator)完成此操作,您可以在其中设置timestampComparator以仅查看时间戳。

Is it possible to have a key value pair that is the entire row of the PCollection?

是否可以拥有一个键值对,即PCollection的整行?

A KV<K, V> can have any type for the key (K) and value (V). If you want to group by key, then the coder for the keys needs to be deterministic. TableRowJsonCoder is not deterministic, because it may contain arbitrary objects. But it sounds like you want to have the customerID for the key and the entire TableRow for the value.

KV 可以具有键(K)和值(V)的任何类型。如果您想按键分组,那么键的编码器需要是确定性的。 TableRowJsonCoder不是确定性的,因为它可能包含任意对象。但听起来你想拥有key的customerID和值的整个TableRow。 ,v>

is it possible to apply these Pardo's on the third PCollection without creating a fourth?

是否可以在第三次PCollection上应用这些Pardo而不创建第四个?

When you apply a PTransform to a PCollection, it results in a new PCollection. There is no way around that, and you don't need to try to minimize the number of PCollections in your pipeline.

将PTransform应用于PCollection时,会产生新的PCollection。没有办法解决这个问题,您无需尝试最小化管道中的PCollections数量。

A PCollection is a conceptual object; it does not have intrinsic cost. Your pipeline is going to be heavily optimized so that many intermediate PCollections - especially those in a sequence of ParDo transforms - will never be materialized anyhow.

PCollection是一个概念对象;它没有内在成本。您的管道将进行大量优化,以便许多中间PCollection - 尤其是ParDo转换序列中的那些 - 将无论如何都不会实现。