Apache Beam ETL维表加载,有例子吗?

时间:2022-11-19 15:36:27

I am thinking of Loading File into one Dimension table. My solution is:

我正在考虑将文件加载到一维表中。我的解决方案是:

  1. Beam.read the file
  2. 光束。读取文件
  3. Create the side input from the DB about existing data.
  4. 创建来自DB的关于现有数据的边输入。
  5. in a ParDo: filter the records which are already in the side input
  6. 在ParDo中:过滤已经在边输入中的记录
  7. biquerySink into DB.
  8. biquerySink DB。

and want to inquire if someone has implement this ? and can you give me some example for this ? Thanks

想知道是否有人实现了这个?你能给我举个例子吗?谢谢

can you give me some example about coGroupByKey. I understand that it may look like below : Sorry,I am newbie to Dataflow,and watching codes is the best way to me 

step 1: sourcedata = beam.ReadFromText(...)
step 2: existing_table = beam.pvalue.AsDict(p
                                    | beam.Read(beam.BigQuerySource(my_query)
                                    | beam.Map(format_rows)

I assume the structure of sourcedata and existing data is the same :<k,v>                       
step 3:  source_existing_Data=  {sourcedata,existing_table}
                                |'coGroupBy' >> beam.coGroupByKey()


step4:  new_Data = source_existing_Data | beam.filter(lamada (name,(existing,source)):source is NONE))

step 5:  bigQuerySink(new_Data)

2 个解决方案

#1


0  

Side inputs are a good option for this, but consider that if your DB table is pretty large, you may find later that CoGroupByKey is a better option. To implement this in side inputs, you'd do the following:

侧输入是一个很好的选择,但是请考虑如果您的DB表相当大,您稍后可能会发现CoGroupByKey是一个更好的选择。要在侧输入中实现这一点,您需要做以下工作:

p = beam.Pipeline(..)
existing_table = beam.pvalue.AsDict(p
                                    | beam.Read(beam.io.BigQuerySource(my_query)
                                    | beam.Map(format_rows))

class FilterRowsDoFn(beam.DoFn):
  def process(self, elem, table_dict):
    k = elem[0]
    if k not in table_dict:
      yield elem

result = (p
          | beam.ReadFromText(...)
          | beam.ParDo(FilterRowsDoFn(), table_dict=existing_table))

And then you can write the result to BQ. But, again, if your table already contains many elements, you may want to consider using CoGroupByKey.

然后你可以把结果写在BQ上。但是,如果您的表已经包含许多元素,您可能需要考虑使用CoGroupByKey。


The code to accomplish this using CoGroupByKey should look something like this:

使用CoGroupByKey完成此任务的代码应该如下所示:

sourcedata = (p 
              | beam.ReadFromText(...)
              | beam.Map(format_text))
existing_table = (p
                  | beam.Read(beam.io.BigQuerySource(my_query)
                  | beam.Map(format_rows))

source_existing_data = ((sourcedata, existing_table)
                        | 'coGroupBy' >> beam.coGroupByKey())

new_data = (source_existing_data 
            | beam.Filter(lamada (name, (source, existing)): not list(source))
            | beam.FlatMap(lambda (name, (source, existing)): [(name, s) for s in source]))

result = new_data | bigQuerySink(new_Data)

Let me know if you have any trouble using either of the code snippets so I'll fix them up.

如果您在使用这两个代码片段时遇到任何问题,请告诉我,我将修复它们。

#2


0  

For the row coming from the text file and row coming form BIGQUERY needed to be done with function :
    from GCPUtil import BuildTupleRowFn as BuildTupleRowFn
    from GCPUtil import BuildDictTupleRowFn as BuildDictTupleRowFn
and also the new data also after coGroupKey and Filter also need to convert since what get from coGroupKey is Tuple, so need to convert it from Dict or List.

Below is the detailed codes:

#####################################################################
#   Develop by Emma 2017/08/19
#####################################################################

    import argparse
    import logging
    from random import randrange

    import apache_beam as beam

    from apache_beam.io import WriteToText
    from apache_beam.pvalue import AsList
    from apache_beam.pvalue import AsSingleton
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import GoogleCloudOptions
    from apache_beam.options.pipeline_options import StandardOptions
    import sys
    sys.path.append("..")
    from GCPUtil import BuildTupleRowFn as BuildTupleRowFn
    from GCPUtil import BuildDictTupleRowFn as BuildDictTupleRowFn

    def configure_bigquery_write():
        return [
            ('CAND_ID', 'STRING'),
            ('CAND_NAME', 'STRING'),
        ]


    class BuildRowFn(beam.DoFn):
        def process(self, element):
            row = {}
            for entry in element:
                print('start')
                print(entry)
                # print(entry[0])
                # print(entry[1])
                print('end')
                row['CAND_ID'] = entry[0]
                row['CAND_NAME'] = entry[1]
                yield row



    def run(argv=None):
        """Run the workflow."""

        # schema = 'CAND_ID:STRING,CAND_NAME:STRING'
        schema = 'CAND_ID:STRING,CAND_NAME:STRING'
        parser = argparse.ArgumentParser()
        parser.add_argument('--input', default=r'd:/resource/test*')
        parser.add_argument('--output', default=r'd:/output/test/new_emma')

        # parser.add_argument('--project', default='chinarose_project')
        known_args, pipeline_args = parser.parse_known_args(argv)
        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'
        pipeline_options.view_as(GoogleCloudOptions).project = 'chinarose_project'
        # query = 'select store FROM [chinarose_project:emma_test.sales]'
        query = 'select CAND_ID ,CAND_NAME from emma_test.campaign'
        p = beam.Pipeline(options=pipeline_options)

        # get the length of the word and write them in the text file,noticed the UDF

        source_data = (p | beam.io.ReadFromText(known_args.input)
                       | beam.Map(lambda a: a.split(","))
                       | beam.ParDo(BuildTupleRowFn())
                       )
        # source_data | 'write' >> WriteToText(known_args.output)
        # source_data | WriteToText(known_args.output)


        print("connect to BQ")
        existing_data= (p | beam.io.Read(beam.io.BigQuerySource(query=query, project='chinarose_project'))
                          | beam.ParDo(BuildDictTupleRowFn())

                        )
        #existing_data | WriteToText(known_args.output)

        source_existing_data = ((source_data, existing_data)
                                | 'GoGroupBy' >> beam.CoGroupByKey())

        # source_existing_data |'write to text' >> WriteToText(known_args.output)
        new_data = (source_existing_data | beam.Filter(lambda (name, (source, existing)): len(existing) == 0)
                    | beam.Map(lambda (name, (source, existing)): [(name, s) for s in source])
                    | beam.ParDo(BuildRowFn())
                    | beam.io.Write(beam.io.BigQuerySink(table='campaign_emma_v2',  dataset='emma_test',project='chinarose_project',schema=schema))
                    )

        #new_data | 'write to text' >> WriteToText(known_args.output)


        p.run().wait_until_finish()

      if __name__ == '__main__':
        # logging.getLogger().setLevel(logging.INFO)
        print('begin')
        run()
        print('end')

#1


0  

Side inputs are a good option for this, but consider that if your DB table is pretty large, you may find later that CoGroupByKey is a better option. To implement this in side inputs, you'd do the following:

侧输入是一个很好的选择,但是请考虑如果您的DB表相当大,您稍后可能会发现CoGroupByKey是一个更好的选择。要在侧输入中实现这一点,您需要做以下工作:

p = beam.Pipeline(..)
existing_table = beam.pvalue.AsDict(p
                                    | beam.Read(beam.io.BigQuerySource(my_query)
                                    | beam.Map(format_rows))

class FilterRowsDoFn(beam.DoFn):
  def process(self, elem, table_dict):
    k = elem[0]
    if k not in table_dict:
      yield elem

result = (p
          | beam.ReadFromText(...)
          | beam.ParDo(FilterRowsDoFn(), table_dict=existing_table))

And then you can write the result to BQ. But, again, if your table already contains many elements, you may want to consider using CoGroupByKey.

然后你可以把结果写在BQ上。但是,如果您的表已经包含许多元素,您可能需要考虑使用CoGroupByKey。


The code to accomplish this using CoGroupByKey should look something like this:

使用CoGroupByKey完成此任务的代码应该如下所示:

sourcedata = (p 
              | beam.ReadFromText(...)
              | beam.Map(format_text))
existing_table = (p
                  | beam.Read(beam.io.BigQuerySource(my_query)
                  | beam.Map(format_rows))

source_existing_data = ((sourcedata, existing_table)
                        | 'coGroupBy' >> beam.coGroupByKey())

new_data = (source_existing_data 
            | beam.Filter(lamada (name, (source, existing)): not list(source))
            | beam.FlatMap(lambda (name, (source, existing)): [(name, s) for s in source]))

result = new_data | bigQuerySink(new_Data)

Let me know if you have any trouble using either of the code snippets so I'll fix them up.

如果您在使用这两个代码片段时遇到任何问题,请告诉我,我将修复它们。

#2


0  

For the row coming from the text file and row coming form BIGQUERY needed to be done with function :
    from GCPUtil import BuildTupleRowFn as BuildTupleRowFn
    from GCPUtil import BuildDictTupleRowFn as BuildDictTupleRowFn
and also the new data also after coGroupKey and Filter also need to convert since what get from coGroupKey is Tuple, so need to convert it from Dict or List.

Below is the detailed codes:

#####################################################################
#   Develop by Emma 2017/08/19
#####################################################################

    import argparse
    import logging
    from random import randrange

    import apache_beam as beam

    from apache_beam.io import WriteToText
    from apache_beam.pvalue import AsList
    from apache_beam.pvalue import AsSingleton
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import GoogleCloudOptions
    from apache_beam.options.pipeline_options import StandardOptions
    import sys
    sys.path.append("..")
    from GCPUtil import BuildTupleRowFn as BuildTupleRowFn
    from GCPUtil import BuildDictTupleRowFn as BuildDictTupleRowFn

    def configure_bigquery_write():
        return [
            ('CAND_ID', 'STRING'),
            ('CAND_NAME', 'STRING'),
        ]


    class BuildRowFn(beam.DoFn):
        def process(self, element):
            row = {}
            for entry in element:
                print('start')
                print(entry)
                # print(entry[0])
                # print(entry[1])
                print('end')
                row['CAND_ID'] = entry[0]
                row['CAND_NAME'] = entry[1]
                yield row



    def run(argv=None):
        """Run the workflow."""

        # schema = 'CAND_ID:STRING,CAND_NAME:STRING'
        schema = 'CAND_ID:STRING,CAND_NAME:STRING'
        parser = argparse.ArgumentParser()
        parser.add_argument('--input', default=r'd:/resource/test*')
        parser.add_argument('--output', default=r'd:/output/test/new_emma')

        # parser.add_argument('--project', default='chinarose_project')
        known_args, pipeline_args = parser.parse_known_args(argv)
        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'
        pipeline_options.view_as(GoogleCloudOptions).project = 'chinarose_project'
        # query = 'select store FROM [chinarose_project:emma_test.sales]'
        query = 'select CAND_ID ,CAND_NAME from emma_test.campaign'
        p = beam.Pipeline(options=pipeline_options)

        # get the length of the word and write them in the text file,noticed the UDF

        source_data = (p | beam.io.ReadFromText(known_args.input)
                       | beam.Map(lambda a: a.split(","))
                       | beam.ParDo(BuildTupleRowFn())
                       )
        # source_data | 'write' >> WriteToText(known_args.output)
        # source_data | WriteToText(known_args.output)


        print("connect to BQ")
        existing_data= (p | beam.io.Read(beam.io.BigQuerySource(query=query, project='chinarose_project'))
                          | beam.ParDo(BuildDictTupleRowFn())

                        )
        #existing_data | WriteToText(known_args.output)

        source_existing_data = ((source_data, existing_data)
                                | 'GoGroupBy' >> beam.CoGroupByKey())

        # source_existing_data |'write to text' >> WriteToText(known_args.output)
        new_data = (source_existing_data | beam.Filter(lambda (name, (source, existing)): len(existing) == 0)
                    | beam.Map(lambda (name, (source, existing)): [(name, s) for s in source])
                    | beam.ParDo(BuildRowFn())
                    | beam.io.Write(beam.io.BigQuerySink(table='campaign_emma_v2',  dataset='emma_test',project='chinarose_project',schema=schema))
                    )

        #new_data | 'write to text' >> WriteToText(known_args.output)


        p.run().wait_until_finish()

      if __name__ == '__main__':
        # logging.getLogger().setLevel(logging.INFO)
        print('begin')
        run()
        print('end')