Apache Beam管道中的组元素

时间:2021-03-03 15:22:49

I have got a pipeline that parses records from AVRO files.

我有一个解析AVRO文件记录的管道。

I need to split the incoming records into chunks of 500 items in order to call an API that takes multiple inputs at the same time.

我需要将传入的记录拆分为500个项目的块,以便调用同时接受多个输入的API。

Is there a way to do this with the Python SDK?

有没有办法用Python SDK做到这一点?

1 个解决方案

#1


1  

I'm supposing that you mean a Batch use case. You have a couple options for this:

我想你的意思是批用例。你有几个选择:

If your PCollection is large enough, and you have some flexibility on the size of your bundles, you can use a GroupByKey transform after assigning keys in random/round robin order to your elements. e.g.:

如果你的PCollection足够大,并且你对bundle的大小有一定的灵活性,你可以在以随机/循环方式为你的元素分配键后使用GroupByKey转换。例如。:

my_collection = p | ReadRecordsFromAvro()

element_bundles = (my_collection 
                     # Choose a number of keys that works for you (I chose 50 here)
                   | 'AddKeys' >> beam.Map(lambda x: (randint(0, 50), x))
                   | 'MakeBundles' >> beam.GroupByKey()
                   | 'DropKeys' >> beam.Map(lambda (k, bundle): bundle)
                   | beam.ParDo(ProcessBundlesDoFn()))

Where ProcessBundlesDoFn is something like so:

ProcessBundlesDoFn就是这样的:

class ProcessBundlesDoFn(beam.DoFn):
  def process(self, bundle):
    while bundle.has_next():
      # Fetch in batches of 500 until you're done
      result = fetch_n_elements(bundle, 500)
      yield result

If you need to have all bundles of exactly 500 elements, then you may need to:

如果您需要包含恰好500个元素的所有包,那么您可能需要:

  1. Count the # of elements in your PCollection
  2. 计算PCollection中的元素数量
  3. Pass that count as a singleton side input to your 'AddKeys' ParDo, to determine exactly the number of keys that you will need.
  4. 将该计数作为单一侧输入传递给您的“AddKeys”ParDo,以确切确定您需要的键数。

Hope that helps.

希望有所帮助。

#1


1  

I'm supposing that you mean a Batch use case. You have a couple options for this:

我想你的意思是批用例。你有几个选择:

If your PCollection is large enough, and you have some flexibility on the size of your bundles, you can use a GroupByKey transform after assigning keys in random/round robin order to your elements. e.g.:

如果你的PCollection足够大,并且你对bundle的大小有一定的灵活性,你可以在以随机/循环方式为你的元素分配键后使用GroupByKey转换。例如。:

my_collection = p | ReadRecordsFromAvro()

element_bundles = (my_collection 
                     # Choose a number of keys that works for you (I chose 50 here)
                   | 'AddKeys' >> beam.Map(lambda x: (randint(0, 50), x))
                   | 'MakeBundles' >> beam.GroupByKey()
                   | 'DropKeys' >> beam.Map(lambda (k, bundle): bundle)
                   | beam.ParDo(ProcessBundlesDoFn()))

Where ProcessBundlesDoFn is something like so:

ProcessBundlesDoFn就是这样的:

class ProcessBundlesDoFn(beam.DoFn):
  def process(self, bundle):
    while bundle.has_next():
      # Fetch in batches of 500 until you're done
      result = fetch_n_elements(bundle, 500)
      yield result

If you need to have all bundles of exactly 500 elements, then you may need to:

如果您需要包含恰好500个元素的所有包,那么您可能需要:

  1. Count the # of elements in your PCollection
  2. 计算PCollection中的元素数量
  3. Pass that count as a singleton side input to your 'AddKeys' ParDo, to determine exactly the number of keys that you will need.
  4. 将该计数作为单一侧输入传递给您的“AddKeys”ParDo,以确切确定您需要的键数。

Hope that helps.

希望有所帮助。