How do I merge the JSON data rows as shown below using the merge function below with pyspark?
如何使用下面的合并函数与pyspark合并JSON数据行,如下所示?
Note: Assume this is just a minutia example and I have 1000s of rows of data to merge. What is the most performant solution? For better or for worse, I must use pyspark.
注意:假设这只是一个细节示例,我需要合并1000行数据。什么是最高性能的解决方案?无论好坏,我必须使用pyspark。
Input:
data = [
{'timestamp': '20080411204445', 'address': '100 Sunder Ct', 'name': 'Joe Schmoe'},
{'timestamp': '20040218165319', 'address': '100 Lee Ave', 'name': 'Joe Schmoe'},
{'timestamp': '20120309173318', 'address': '1818 Westminster', 'name': 'John Doe'},
... More ...
]
Desired Output:
combined_result = [
{'name': 'Joe Schmoe': {'addresses': [('20080411204445', '100 Sunder Ct'), ('20040218165319', '100 Lee Ave')]}},
{'name': 'John Doe': {'addresses': [('20120309173318', '1818 Westminster')]}},
... More ...
]
Merge function:
def reduce_on_name(a, b):
'''Combines two JSON data rows based on name'''
merged = {}
if a['name'] == b['name']:
addresses = (a['timestamp'], a['address']), (b['timestamp'], b['address'])
merged['name'] = a['name']
merged['addresses'] = addresses
return merged
2 个解决方案
#1
1
I think it would be something like this:
我想会是这样的:
sc.parallelize(data).groupBy(lambda x: x['name']).map(lambda t: {'name':t[0],'addresses':[(x['timestamp'], x['address']) for x in t[1]]}).collect()
#2
0
All right, using maxymoo's example, I put together my own reusable code. It's not exactly what I was looking for, but it gets me closer to how I want to solve this particular problem: without lambdas and with reusable code.
好吧,使用maxymoo的例子,我把我自己的可重用代码放在一起。这不是我想要的,但它让我更接近于我想要解决这个特定问题:没有lambdas和可重复使用的代码。
#!/usr/bin/env pyspark
# -*- coding: utf-8 -*-
data = [
{'timestamp': '20080411204445', 'address': '100 Sunder Ct', 'name': 'Joe Schmoe'},
{'timestamp': '20040218165319', 'address': '100 Lee Ave', 'name': 'Joe Schmoe'},
{'timestamp': '20120309173318', 'address': '1818 Westminster', 'name': 'John Doe'},
]
def combine(field):
'''Returns a function which reduces on a specific field
Args:
field(str): data field to use for merging
Returns:
func: returns a function which supplies the data for the field
'''
def _reduce_this(data):
'''Returns the field value using data'''
return data[field]
return _reduce_this
def aggregate(*fields):
'''Merges data based on a list of fields
Args:
fields(list): a list of fields that should be used as a composite key
Returns:
func: a function which does the aggregation
'''
def _merge_this(iterable):
name, iterable = iterable
new_map = dict(name=name, window=dict(max=None, min=None))
for data in iterable:
for field, value in data.iteritems():
if field in fields:
new_map[field] = value
else:
new_map.setdefault(field, set()).add(value)
return new_map
return _merge_this
# sc provided by pyspark context
combined = sc.parallelize(data).groupBy(combine('name'))
reduced = combined.map(aggregate('name'))
output = reduced.collect()
#1
1
I think it would be something like this:
我想会是这样的:
sc.parallelize(data).groupBy(lambda x: x['name']).map(lambda t: {'name':t[0],'addresses':[(x['timestamp'], x['address']) for x in t[1]]}).collect()
#2
0
All right, using maxymoo's example, I put together my own reusable code. It's not exactly what I was looking for, but it gets me closer to how I want to solve this particular problem: without lambdas and with reusable code.
好吧,使用maxymoo的例子,我把我自己的可重用代码放在一起。这不是我想要的,但它让我更接近于我想要解决这个特定问题:没有lambdas和可重复使用的代码。
#!/usr/bin/env pyspark
# -*- coding: utf-8 -*-
data = [
{'timestamp': '20080411204445', 'address': '100 Sunder Ct', 'name': 'Joe Schmoe'},
{'timestamp': '20040218165319', 'address': '100 Lee Ave', 'name': 'Joe Schmoe'},
{'timestamp': '20120309173318', 'address': '1818 Westminster', 'name': 'John Doe'},
]
def combine(field):
'''Returns a function which reduces on a specific field
Args:
field(str): data field to use for merging
Returns:
func: returns a function which supplies the data for the field
'''
def _reduce_this(data):
'''Returns the field value using data'''
return data[field]
return _reduce_this
def aggregate(*fields):
'''Merges data based on a list of fields
Args:
fields(list): a list of fields that should be used as a composite key
Returns:
func: a function which does the aggregation
'''
def _merge_this(iterable):
name, iterable = iterable
new_map = dict(name=name, window=dict(max=None, min=None))
for data in iterable:
for field, value in data.iteritems():
if field in fields:
new_map[field] = value
else:
new_map.setdefault(field, set()).add(value)
return new_map
return _merge_this
# sc provided by pyspark context
combined = sc.parallelize(data).groupBy(combine('name'))
reduced = combined.map(aggregate('name'))
output = reduced.collect()