使用python在apache梁中的PCollection中的几个字段的最大值和最小值

时间:2022-01-05 15:34:15

I am using apache beam via python SDK and have the following problem:

我通过python SDK使用apache beam并遇到以下问题:

I have a PCollection with approximately 1 mln entries, each entry in a PCollection looks like a list of 2-tuples [(key1,value1),(key2,value2),...] with length ~150. I need to find max and min values across all entries of the PCollection for each key in order normalize the values.

我有一个带有大约1个条目的PCollection,PCollection中的每个条目看起来像一个长度为〜150的2元组[(key1,value1),(key2,value2),...]的列表。我需要在每个键的PCollection的所有条目中找到最大值和最小值,以便对值进行标准化。

Ideally, it will be good to obtain PCollection with a list of tuples [(key,max_value,min_value),...] and then it will be easy to proceed with normalization to get [(key1,norm_value1),(key2,norm_value2),...], where norm_value = (value - min) / (max - min)

理想情况下,获取带有元组列表[(key,max_value,min_value),...]的PCollection将是很好的,然后很容易进行规范化以获得[(key1,norm_value1),(key2,norm_value2) ),...],其中norm_value =(value - min)/(max - min)

At the moment I can do it only separately for each key by hands, which is not very convenient and not sustainable, so any suggestions will be helpful.

目前我只能用手分别为每个按键单独做,这不是很方便,也不可持续,所以任何建议都会有所帮助。

1 个解决方案

#1


1  

I decided to give it a go using a custom CombineFn function to determine the minimum and maximum per each key. Then, join them with the input data using CoGroupByKey and apply the desired mapping to normalize the values.

我决定使用自定义CombineFn函数来确定每个键的最小值和最大值。然后,使用CoGroupByKey将它们与输入数据连接,并应用所需的映射来规范化值。

"""Normalize PCollection values."""

import logging
import argparse
import sys

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


# custom CombineFn that outputs min and max value
class MinMaxFn(beam.CombineFn):
  # initialize min and max values (I assumed int type)
  def create_accumulator(self):
    return (sys.maxint, 0)

  # update if current value is a new min or max      
  def add_input(self, min_max, input):
    (current_min, current_max) = min_max
    return min(current_min, input), max(current_max, input)

  def merge_accumulators(self, accumulators):
    return accumulators

  def extract_output(self, min_max):
    return min_max


def run(argv=None):
  """Main entry point; defines and runs the pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument('--output',
                      dest='output',
                      required=True,
                      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  p = beam.Pipeline(options=pipeline_options)

  # create test data
  pc = [('foo', 1), ('bar', 5), ('foo', 5), ('bar', 9), ('bar', 2)]

  # first run through data to apply custom combineFn and determine min/max per key
  minmax = pc | 'Determine Min Max' >> beam.CombinePerKey(MinMaxFn())

  # group input data by key and append corresponding min and max 
  merged = (pc, minmax) | 'Join Pcollections' >> beam.CoGroupByKey()

  # apply mapping to normalize values according to 'norm_value = (value - min) / (max - min)'
  normalized = merged | 'Normalize values' >> beam.Map(lambda (a, (b, c)): (a, [float(val - c[0][0][0])/(c[0][0][1] -c[0][0][0]) for val in b]))

  # write results to output file
  normalized | 'Write results' >> WriteToText(known_args.output)

  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

The snippet can be run with python SCRIPT_NAME.py --output OUTPUT_FILENAME. My test data, grouped by key, is:

可以使用python SCRIPT_NAME.py --output OUTPUT_FILENAME运行该代码段。按键分组的测试数据是:

('foo', [1, 5])
('bar', [5, 9, 2])

The CombineFn will return per key min and max:

CombineFn将按键min和max返回:

('foo', [(1, 5)])
('bar', [(2, 9)])

The output of the join/cogroup by key operation:

按键操作的join / cogroup输出:

('foo', ([1, 5], [[(1, 5)]]))
('bar', ([5, 9, 2], [[(2, 9)]]))

And after normalizing:

正常化后:

('foo', [0.0, 1.0])
('bar', [0.42857142857142855, 1.0, 0.0])

This was just a simple test so I’m sure it can be optimized for the mentioned volume of data but it seems to work as a starting point. Take into account that further considerations might be needed (i.e. avoid dividing by zero if min = max)

这只是一个简单的测试,所以我确信它可以针对上面提到的数据量进行优化,但它似乎是一个起点。考虑到可能需要进一步考虑(即如果min = max,则避免除以零)

#1


1  

I decided to give it a go using a custom CombineFn function to determine the minimum and maximum per each key. Then, join them with the input data using CoGroupByKey and apply the desired mapping to normalize the values.

我决定使用自定义CombineFn函数来确定每个键的最小值和最大值。然后,使用CoGroupByKey将它们与输入数据连接,并应用所需的映射来规范化值。

"""Normalize PCollection values."""

import logging
import argparse
import sys

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


# custom CombineFn that outputs min and max value
class MinMaxFn(beam.CombineFn):
  # initialize min and max values (I assumed int type)
  def create_accumulator(self):
    return (sys.maxint, 0)

  # update if current value is a new min or max      
  def add_input(self, min_max, input):
    (current_min, current_max) = min_max
    return min(current_min, input), max(current_max, input)

  def merge_accumulators(self, accumulators):
    return accumulators

  def extract_output(self, min_max):
    return min_max


def run(argv=None):
  """Main entry point; defines and runs the pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument('--output',
                      dest='output',
                      required=True,
                      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  p = beam.Pipeline(options=pipeline_options)

  # create test data
  pc = [('foo', 1), ('bar', 5), ('foo', 5), ('bar', 9), ('bar', 2)]

  # first run through data to apply custom combineFn and determine min/max per key
  minmax = pc | 'Determine Min Max' >> beam.CombinePerKey(MinMaxFn())

  # group input data by key and append corresponding min and max 
  merged = (pc, minmax) | 'Join Pcollections' >> beam.CoGroupByKey()

  # apply mapping to normalize values according to 'norm_value = (value - min) / (max - min)'
  normalized = merged | 'Normalize values' >> beam.Map(lambda (a, (b, c)): (a, [float(val - c[0][0][0])/(c[0][0][1] -c[0][0][0]) for val in b]))

  # write results to output file
  normalized | 'Write results' >> WriteToText(known_args.output)

  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

The snippet can be run with python SCRIPT_NAME.py --output OUTPUT_FILENAME. My test data, grouped by key, is:

可以使用python SCRIPT_NAME.py --output OUTPUT_FILENAME运行该代码段。按键分组的测试数据是:

('foo', [1, 5])
('bar', [5, 9, 2])

The CombineFn will return per key min and max:

CombineFn将按键min和max返回:

('foo', [(1, 5)])
('bar', [(2, 9)])

The output of the join/cogroup by key operation:

按键操作的join / cogroup输出:

('foo', ([1, 5], [[(1, 5)]]))
('bar', ([5, 9, 2], [[(2, 9)]]))

And after normalizing:

正常化后:

('foo', [0.0, 1.0])
('bar', [0.42857142857142855, 1.0, 0.0])

This was just a simple test so I’m sure it can be optimized for the mentioned volume of data but it seems to work as a starting point. Take into account that further considerations might be needed (i.e. avoid dividing by zero if min = max)

这只是一个简单的测试,所以我确信它可以针对上面提到的数据量进行优化,但它似乎是一个起点。考虑到可能需要进一步考虑(即如果min = max,则避免除以零)