如何使用Apache Beam Python SDK在给定密钥的两个Source上执行“diff”?

时间:2023-01-28 15:34:38

I posed the question generically, because maybe it is a generic answer. But a specific example is comparing 2 BigQuery tables with the same schema, but potentially different data. I want a diff, i.e. what was added, deleted, modified, with respect to a composite key, e.g. the first 2 columns.

我提出了一般性的问题,因为它可能是一个通用的答案。但是一个具体的例子是将2个BigQuery表与相同的模式进行比较,但可能会有不同的数据。我想要一个差异,即关于复合密钥添加,删除,修改的内容,例如,前两列。

Table A
C1  C2  C3
-----------
a   a   1
a   b   1
a   c   1

Table B     
C1  C2  C3  # Notes if comparing B to A
-------------------------------------
a   a   1   # No Change to the key a + a
a   b   2   # Key a + b Changed from 1 to 2
            # Deleted key a + c with value 1
a   d   1   # Added key a + d

I basically want to be able to make/report the comparison notes. Or from a Beam perspective I may want to Just output up to 4 labeled PCollections: Unchanged, Changed, Added, Deleted. How do I do this and what would the PCollections look like?

我基本上希望能够制作/报告比较说明。或者从Beam的角度来看,我可能想要输出最多4个标记的PCollections:不变,更改,添加,删除。我该怎么做以及PCollections会是什么样子?

1 个解决方案

#1


0  

What you want to do here, basically, is join two tables and compare the result of that, right? You can look at my answer to this question, to see the two ways in which you can join two tables (Side inputs, or CoGroupByKey).

你想在这里做什么,基本上是加入两个表并比较结果,对吧?您可以查看我对这个问题的回答,看看您可以通过两种方式连接两个表(Side输入或CoGroupByKey)。

I'll also code a solution for your problem using CoGroupByKey. I'm writing the code in Python because I'm more familiar with the Python SDK, but you'd implement similar logic in Java:

我还将使用CoGroupByKey为您的问题编写解决方案。我正在用Python编写代码,因为我对Python SDK比较熟悉,但你在Java中实现了类似的逻辑:

def make_kv_pair(x):
  """ Output the record with the x[0]+x[1] key added."""
  return ((x[0], x[1]), x)

table_a = (p | 'ReadTableA' >> beam.Read(beam.io.BigQuerySource(....))
            | 'SetKeysA' >> beam.Map(make_kv_pair)
table_b = (p | 'ReadTableB' >> beam.Read(beam.io.BigQuerySource(....))
            | 'SetKeysB' >> beam.Map(make_kv_pair))

joined_tables = ({'table_a': table_a, 'table_b': table_b}
                 | beam.CoGroupByKey())


output_types = ['changed', 'added', 'deleted', 'unchanged']
class FilterDoFn(beam.DoFn):
  def process((key, values)):
    table_a_value = list(values['table_a'])
    table_b_value = list(values['table_b'])
    if table_a_value == table_b_value:
      yield pvalue.TaggedOutput('unchanged', key)
    elif len(table_a_value) < len(table_b_value):
      yield pvalue.TaggedOutput('added', key)
    elif len(table_a_value) > len(table_b_value):
      yield pvalue.TaggedOutput('removed', key)
    elif table_a_value != table_b_value:
      yield pvalue.TaggedOutput('changed', key)

key_collections = (joined_tables 
                   | beam.ParDo(FilterDoFn()).with_outputs(*output_types))

# Now you can handle each output
key_collections.unchanged | WriteToText(...)
key_collections.changed | WriteToText(...)
key_collections.added | WriteToText(...)
key_collections.removed | WriteToText(...)

#1


0  

What you want to do here, basically, is join two tables and compare the result of that, right? You can look at my answer to this question, to see the two ways in which you can join two tables (Side inputs, or CoGroupByKey).

你想在这里做什么,基本上是加入两个表并比较结果,对吧?您可以查看我对这个问题的回答,看看您可以通过两种方式连接两个表(Side输入或CoGroupByKey)。

I'll also code a solution for your problem using CoGroupByKey. I'm writing the code in Python because I'm more familiar with the Python SDK, but you'd implement similar logic in Java:

我还将使用CoGroupByKey为您的问题编写解决方案。我正在用Python编写代码,因为我对Python SDK比较熟悉,但你在Java中实现了类似的逻辑:

def make_kv_pair(x):
  """ Output the record with the x[0]+x[1] key added."""
  return ((x[0], x[1]), x)

table_a = (p | 'ReadTableA' >> beam.Read(beam.io.BigQuerySource(....))
            | 'SetKeysA' >> beam.Map(make_kv_pair)
table_b = (p | 'ReadTableB' >> beam.Read(beam.io.BigQuerySource(....))
            | 'SetKeysB' >> beam.Map(make_kv_pair))

joined_tables = ({'table_a': table_a, 'table_b': table_b}
                 | beam.CoGroupByKey())


output_types = ['changed', 'added', 'deleted', 'unchanged']
class FilterDoFn(beam.DoFn):
  def process((key, values)):
    table_a_value = list(values['table_a'])
    table_b_value = list(values['table_b'])
    if table_a_value == table_b_value:
      yield pvalue.TaggedOutput('unchanged', key)
    elif len(table_a_value) < len(table_b_value):
      yield pvalue.TaggedOutput('added', key)
    elif len(table_a_value) > len(table_b_value):
      yield pvalue.TaggedOutput('removed', key)
    elif table_a_value != table_b_value:
      yield pvalue.TaggedOutput('changed', key)

key_collections = (joined_tables 
                   | beam.ParDo(FilterDoFn()).with_outputs(*output_types))

# Now you can handle each output
key_collections.unchanged | WriteToText(...)
key_collections.changed | WriteToText(...)
key_collections.added | WriteToText(...)
key_collections.removed | WriteToText(...)