I have code that looks like this:
我的代码看起来像这样:
print df.groupBy('offer_id', 'record_id').avg().collect()
Which works but I have a string that has:
哪个有效,但我有一个字符串:
print df.groupBy(stringNamesDF).avg().collect()
Which fails with:
哪个失败了:
org.apache.spark.sql.AnalysisException: cannot resolve ''record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr', 'acct_nbr_assigned_dttm', 'acct_expiration_dttm', 'offer_desc', 'offer_sales_script', 'presentable_flag', 'insrt_dttm', 'insrt_usr_id', 'chng_dttm', 'chng_usr_id', 'actv_flag', 'correlation_id', 'offer_status_type_cd', 'presentation_instrument_nbr'' given input columns pymt, rescindable_days, rescinded_date, market_cell_id, offer_sales_script, assigned_offer_id, offer_desc, rate_index_type_cd, nbr_of_pymts, campaign_id, down_pymt, offer_status_type_cd, offer_type_cd, acct_expiration_dttm, record_id, origination_fee_rate, insrt_usr_id, promo_id, term_mm, min_amount, offer_good_until_date, decision_id, insrt_dttm, late_fee_min_amount, late_fee_percent, offer_id, origination_fee_amount, presentation_instrument_nbr, offer_order, chng_usr_id, correlation_id, acct_nbr_assigned_dttm, chng_dttm, presentable_flag, accepted_offer_flag, amount, min_rate, max_rate, acct_nbr, actv_flag, sub_product_id, cs_result_id, current_offer_flag, finance_charge, annual_fee_waived_mm, cs_result_usage_type_cd, max_amount, total_pymts, contract_date, index_rate, first_pymt_date, annual_fee_amount, rate, amount_financed, pymt_method_type_cd;
stringNamesDF print looks like:
stringNamesDF打印如下:
'record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr', 'acct_nbr_assigned_dttm', 'acct_expiration_dttm', 'offer_desc', 'offer_sales_script', 'presentable_flag', 'insrt_dttm', 'insrt_usr_id', 'chng_dttm', 'chng_usr_id', 'actv_flag', 'correlation_id', 'offer_status_type_cd', 'presentation_instrument_nbr'
I've also tried with stringNamesDF looking like this:
我也尝试过使用stringNamesDF,如下所示:
record_id, assigned_offer_id, accepted_offer_flag, current_offer_flag, offer_good_until_date, rescinded_date, first_pymt_date, contract_date, acct_nbr, acct_nbr_assigned_dttm, acct_expiration_dttm, offer_desc, offer_sales_script, presentable_flag, insrt_dttm, insrt_usr_id, chng_dttm, chng_usr_id, actv_flag, correlation_id, offer_status_type_cd, presentation_instrument_nbr
but get this:
但得到这个:
org.apache.spark.sql.AnalysisException: cannot resolve 'record_id, assigned_offer_id, accepted_offer_flag, current_offer_flag, offer_good_until_date, rescinded_date, first_pymt_date, contract_date, acct_nbr, acct_nbr_assigned_dttm, acct_expiration_dttm, offer_desc, offer_sales_script, presentable_flag, insrt_dttm, insrt_usr_id, chng_dttm, chng_usr_id, actv_flag, correlation_id, offer_status_type_cd, presentation_instrument_nbr' given input columns pymt, rescindable_days, rescinded_date, market_cell_id, offer_sales_script, assigned_offer_id, offer_desc, rate_index_type_cd, nbr_of_pymts, campaign_id, down_pymt, offer_status_type_cd, offer_type_cd, acct_expiration_dttm, record_id, origination_fee_rate, insrt_usr_id, promo_id, term_mm, min_amount, offer_good_until_date, decision_id, insrt_dttm, late_fee_min_amount, late_fee_percent, offer_id, origination_fee_amount, presentation_instrument_nbr, offer_order, chng_usr_id, correlation_id, acct_nbr_assigned_dttm, chng_dttm, presentable_flag, accepted_offer_flag, amount, min_rate, max_rate, acct_nbr, actv_flag, sub_product_id, cs_result_id, current_offer_flag, finance_charge, annual_fee_waived_mm, cs_result_usage_type_cd, max_amount, total_pymts, contract_date, index_rate, first_pymt_date, annual_fee_amount, rate, amount_financed, pymt_method_type_cd;
Edit: I've tried with: stringNames[] which looks like with no success:
编辑:我尝试过:stringNames []看起来没有成功:
['record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr', 'acct_nbr_assigned_dttm', 'acct_expiration_dttm', 'offer_desc', 'offer_sales_script', 'presentable_flag', 'insrt_dttm', 'insrt_usr_id', 'chng_dttm', 'chng_usr_id', 'actv_flag', 'correlation_id', 'offer_status_type_cd', 'presentation_instrument_nbr']
and get AttributeError: 'list' object has no attribute '_get_object_id'
并获取AttributeError:'list'对象没有属性'_get_object_id'
2 个解决方案
#1
Try:
print df.groupBy(stringNamesDF.split(", ")).avg().collect()
#2
When stringNamesDF is a list:
当stringNamesDF是一个列表时:
stringNamesDF=['record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr', 'acct_nbr_assigned_dttm', 'acct_expiration_dttm', 'offer_desc', 'offer_sales_script', 'presentable_flag', 'insrt_dttm', 'insrt_usr_id', 'chng_dttm', 'chng_usr_id', 'actv_flag', 'correlation_id', 'offer_status_type_cd', 'presentation_instrument_nbr']
Use:
df.groupBy(*stringNamesDF).avg().collect()
from https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
groupBy(*cols)
Groups the DataFrame using the specified columns, so we can run aggregation on them.
使用指定的列对DataFrame进行分组,因此我们可以对它们进行聚合。
Parameters: cols – list of columns to group by. Each element should be a column name (string) or an expression (Column).
参数:cols - 要分组的列的列表。每个元素应该是列名(字符串)或表达式(列)。
Example:
l = [('Alice','2015-02-02', 1),('Alice','2015-02-02', 2), ('Alice','2015-02-03', 1), ('Bob','2015-02-03', 1), ('Bob','2015-02-03', 3)]
l = [('Alice','2015-02-02',1),('Alice','2015-02-02',2),('Alice','2015-02-03',1) ,('Bob','2015-02-03',1),('Bob','2015-02-03',3)]
ddf = sqlContext.createDataFrame(l, ['name', 'date','clicks'])
ddf = sqlContext.createDataFrame(l,['name','date','clicks'])
ddf.groupBy(*['name','date']).avg().collect()
#1
Try:
print df.groupBy(stringNamesDF.split(", ")).avg().collect()
#2
When stringNamesDF is a list:
当stringNamesDF是一个列表时:
stringNamesDF=['record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr', 'acct_nbr_assigned_dttm', 'acct_expiration_dttm', 'offer_desc', 'offer_sales_script', 'presentable_flag', 'insrt_dttm', 'insrt_usr_id', 'chng_dttm', 'chng_usr_id', 'actv_flag', 'correlation_id', 'offer_status_type_cd', 'presentation_instrument_nbr']
Use:
df.groupBy(*stringNamesDF).avg().collect()
from https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
groupBy(*cols)
Groups the DataFrame using the specified columns, so we can run aggregation on them.
使用指定的列对DataFrame进行分组,因此我们可以对它们进行聚合。
Parameters: cols – list of columns to group by. Each element should be a column name (string) or an expression (Column).
参数:cols - 要分组的列的列表。每个元素应该是列名(字符串)或表达式(列)。
Example:
l = [('Alice','2015-02-02', 1),('Alice','2015-02-02', 2), ('Alice','2015-02-03', 1), ('Bob','2015-02-03', 1), ('Bob','2015-02-03', 3)]
l = [('Alice','2015-02-02',1),('Alice','2015-02-02',2),('Alice','2015-02-03',1) ,('Bob','2015-02-03',1),('Bob','2015-02-03',3)]
ddf = sqlContext.createDataFrame(l, ['name', 'date','clicks'])
ddf = sqlContext.createDataFrame(l,['name','date','clicks'])
ddf.groupBy(*['name','date']).avg().collect()