如何使用Cloud Dataflow在两个或多个密钥上连接BQ表?

时间:2021-09-14 15:35:37

I have two tables A and B. Both of them have the fields session_id and cookie_id. How do i create a Joined table output joining A with B on session_id, cookie_id with the help of a Dataflow pipeline? CoGroupByKey method allows you to join on a single key. Couldn't find anything helpful in the documentation as well.

我有两个表A和B.它们都有字段session_id和cookie_id。如何创建一个Joined表输出,在session_id,Cookie_id的帮助下,使用Dataflow管道连接A和B. CoGroupByKey方法允许您加入单个键。在文档中找不到任何有用的内容。

2 个解决方案

#1


4  

To expand on user9720010's answer. You can create a composite key by mapping the fields to a combination of session_id and cookie_id. This pattern is explained in the common Dataflow use-case patterns blog. Assuming you are using BigQuery, you can do something similar to the following:

扩展user9720010的答案。您可以通过将字段映射到session_id和cookie_id的组合来创建复合键。这种模式在常见的Dataflow用例模式博客中进行了解释。假设您使用的是BigQuery,您可以执行类似以下操作:

Pipeline pipeline = Pipeline.create(options);

// Create tuple tags for the value types in each collection.
final TupleTag<TableRow> table1Tag = new TupleTag<>();
final TupleTag<TableRow> table2Tag = new TupleTag<>();

// Transform for keying table rows by session_id and cookie_id
WithKeys<String, TableRow> sessionAndCookieKeys = WithKeys.of(
    (TableRow row) ->
        String.format("%s#%s",
            row.get("session_id"),
            row.get("cookie_id")))
    .withKeyType(TypeDescriptors.strings());

/*
 * Steps:
 *  1) Read table 1's rows
 *  2) Read table 2's rows
 *  3) Map each row to a composite key
 *  4) Join on the composite key
 *  5) Process the results
 */
PCollection<KV<String, TableRow>> table1Rows = pipeline
    .apply(
        "ReadTable1",
        BigQueryIO
            .readTableRows()
            .from(options.getTable1()))
    .apply("WithKeys", sessionAndCookieKeys);

PCollection<KV<String, TableRow>> table2Rows = pipeline
    .apply(
        "ReadTable2",
        BigQueryIO
            .readTableRows()
            .from(options.getTable2()))
    .apply("WithKeys", sessionAndCookieKeys);

//Merge collection values into a CoGbkResult collection
PCollection<KV<String, CoGbkResult>> coGbkResult = KeyedPCollectionTuple
    .of(table1Tag, table1Rows)
    .and(table2Tag, table2Rows)
    .apply("JoinOnSessionAndCookie", CoGroupByKey.create());

// Process the results
coGbkResult.apply(
    "ProcessResults", 
    ParDo.of(new DoFn<KV<String, CoGbkResult>, Object>() {
      @ProcessElement
      public void processElement(ProcessContext context) {
        // Do something here
      }
    }));

#2


2  

One approach that I follow in such situations is to create an ad hoc key which is the combination of two keys . Post reading data , while converting to key value pair , I would output session_id$cookie_id as a single concatenated string . Here $ can be any delimiter which does not forms the charset of two keys . Delimiter can be ignored also .

在这种情况下我遵循的一种方法是创建一个ad hoc密钥,它是两个密钥的组合。发布读取数据,在转换为键值对时,我会输出session_id $ cookie_id作为单个连接字符串。这里$可以是任何不形成两个键的字符集的分隔符。分隔符也可以忽略。

#1


4  

To expand on user9720010's answer. You can create a composite key by mapping the fields to a combination of session_id and cookie_id. This pattern is explained in the common Dataflow use-case patterns blog. Assuming you are using BigQuery, you can do something similar to the following:

扩展user9720010的答案。您可以通过将字段映射到session_id和cookie_id的组合来创建复合键。这种模式在常见的Dataflow用例模式博客中进行了解释。假设您使用的是BigQuery,您可以执行类似以下操作:

Pipeline pipeline = Pipeline.create(options);

// Create tuple tags for the value types in each collection.
final TupleTag<TableRow> table1Tag = new TupleTag<>();
final TupleTag<TableRow> table2Tag = new TupleTag<>();

// Transform for keying table rows by session_id and cookie_id
WithKeys<String, TableRow> sessionAndCookieKeys = WithKeys.of(
    (TableRow row) ->
        String.format("%s#%s",
            row.get("session_id"),
            row.get("cookie_id")))
    .withKeyType(TypeDescriptors.strings());

/*
 * Steps:
 *  1) Read table 1's rows
 *  2) Read table 2's rows
 *  3) Map each row to a composite key
 *  4) Join on the composite key
 *  5) Process the results
 */
PCollection<KV<String, TableRow>> table1Rows = pipeline
    .apply(
        "ReadTable1",
        BigQueryIO
            .readTableRows()
            .from(options.getTable1()))
    .apply("WithKeys", sessionAndCookieKeys);

PCollection<KV<String, TableRow>> table2Rows = pipeline
    .apply(
        "ReadTable2",
        BigQueryIO
            .readTableRows()
            .from(options.getTable2()))
    .apply("WithKeys", sessionAndCookieKeys);

//Merge collection values into a CoGbkResult collection
PCollection<KV<String, CoGbkResult>> coGbkResult = KeyedPCollectionTuple
    .of(table1Tag, table1Rows)
    .and(table2Tag, table2Rows)
    .apply("JoinOnSessionAndCookie", CoGroupByKey.create());

// Process the results
coGbkResult.apply(
    "ProcessResults", 
    ParDo.of(new DoFn<KV<String, CoGbkResult>, Object>() {
      @ProcessElement
      public void processElement(ProcessContext context) {
        // Do something here
      }
    }));

#2


2  

One approach that I follow in such situations is to create an ad hoc key which is the combination of two keys . Post reading data , while converting to key value pair , I would output session_id$cookie_id as a single concatenated string . Here $ can be any delimiter which does not forms the charset of two keys . Delimiter can be ignored also .

在这种情况下我遵循的一种方法是创建一个ad hoc密钥,它是两个密钥的组合。发布读取数据,在转换为键值对时,我会输出session_id $ cookie_id作为单个连接字符串。这里$可以是任何不形成两个键的字符集的分隔符。分隔符也可以忽略。