I am using Spark to do exploratory data analysis on a user log file. One of the analysis that I am doing is average requests on daily basis per host. So in order to figure out the average, I need to divide the total request column of the DataFrame by number unique Request column of the DataFrame.
我正在使用Spark对用户日志文件进行探索性数据分析。我正在做的一项分析是每个主机每天的平均请求。因此,为了计算平均值,我需要将DataFrame的总请求列除以DataFrame的数字唯一Request列。
total_req_per_day_df = logs_df.select('host',dayofmonth('time').alias('day')).groupby('day').count()
avg_daily_req_per_host_df = total_req_per_day_df.select("day",(total_req_per_day_df["count"] / daily_hosts_df["count"]).alias("count"))
This is what I have written using the PySpark to determine the average. And here is the log of error that I get
这就是我用PySpark编写的用来确定平均值的内容。这是我得到的错误日志
AnalysisException: u'resolved attribute(s) count#1993L missing from day#3628,count#3629L in operator !Project [day#3628,(cast(count#3629L as double) / cast(count#1993L as double)) AS count#3630];
Note: daily_hosts_df and logs_df is cached in the memory. How do you divide the count column of both data frames?
注意:daily_hosts_df和logs_df缓存在内存中。如何划分两个数据帧的计数列?
4 个解决方案
#1
15
It is not possible to reference column from another table. If you want to combine data you'll have to join
first using something similar to this:
无法从另一个表中引用列。如果要组合数据,首先必须使用与此类似的东西加入:
from pyspark.sql.functions import col
(total_req_per_day_df.alias("total")
.join(daily_hosts_df.alias("host"), ["day"])
.select(col("day"), (col("total.count") / col("host.count")).alias("count")))
#2
1
It's a question from an edX Spark course assignment. Since the solution is public now I take the opportunity to share another, slower one and ask whether the performance of it could be improved or is totally anti-Spark?
这是来自edX Spark课程作业的问题。既然解决方案现在已公开,我借此机会分享另一个较慢的解决方案,并询问它的性能是否可以提高还是完全反Spark?
daily_hosts_list = (daily_hosts_df.map(lambda r: (r[0], r[1])).take(30))
days_with_hosts, hosts = zip(*daily_hosts_list)
requests = (total_req_per_day_df.map(lambda r: (r[1])).take(30))
average_requests = [(days_with_hosts[n], float(l)) for n, l in enumerate(list(np.array(requests, dtype=float) / np.array(hosts)))]
avg_daily_req_per_host_df = sqlContext.createDataFrame(average_requests, ('day', 'avg_reqs_per_host_per_day'))
#3
0
Join the two data frames on column day, and then select the day and ratio of the count columns.
在列日加入两个数据框,然后选择计数列的日期和比率。
total_req_per_day_df = logs_df.select(dayofmonth('time')
.alias('day')
).groupBy('day').count()
avg_daily_req_per_host_df = (
total_req_per_day_df.join(daily_hosts_df,
total_req_per_day_df.day == daily_hosts_df.day
)
.select(daily_hosts_df['day'],
(total_req_per_day_df['count']/daily_hosts_df['count'])
.alias('avg_reqs_per_host_per_day')
)
.cache()
)
#4
0
Solution, based on zero323 answer, but correctly works as OUTER join.
解决方案,基于zero323答案,但正确地作为OUTER加入。
avg_daily_req_per_host_df = (
total_req_per_day_df.join(
daily_hosts_df, daily_hosts_df['day'] == total_req_per_day_df['day'], 'outer'
).select(
total_req_per_day_df['day'],
(total_req_per_day_df['count']/daily_hosts_df['count']).alias('avg_reqs_per_host_per_day')
)
).cache()
Without 'outer' param you lost data for days missings in one of dataframes. This is not critical for PySpark Lab2 task, becouse both dataframes contains same dates. But can create some pain in another tasks :)
如果没有“外部”参数,您会丢失其中一个数据帧中的数据丢失数据。这对于PySpark Lab2任务并不重要,因为两个数据帧都包含相同的日期。但是可以在另一个任务中创造一些痛苦:)
#1
15
It is not possible to reference column from another table. If you want to combine data you'll have to join
first using something similar to this:
无法从另一个表中引用列。如果要组合数据,首先必须使用与此类似的东西加入:
from pyspark.sql.functions import col
(total_req_per_day_df.alias("total")
.join(daily_hosts_df.alias("host"), ["day"])
.select(col("day"), (col("total.count") / col("host.count")).alias("count")))
#2
1
It's a question from an edX Spark course assignment. Since the solution is public now I take the opportunity to share another, slower one and ask whether the performance of it could be improved or is totally anti-Spark?
这是来自edX Spark课程作业的问题。既然解决方案现在已公开,我借此机会分享另一个较慢的解决方案,并询问它的性能是否可以提高还是完全反Spark?
daily_hosts_list = (daily_hosts_df.map(lambda r: (r[0], r[1])).take(30))
days_with_hosts, hosts = zip(*daily_hosts_list)
requests = (total_req_per_day_df.map(lambda r: (r[1])).take(30))
average_requests = [(days_with_hosts[n], float(l)) for n, l in enumerate(list(np.array(requests, dtype=float) / np.array(hosts)))]
avg_daily_req_per_host_df = sqlContext.createDataFrame(average_requests, ('day', 'avg_reqs_per_host_per_day'))
#3
0
Join the two data frames on column day, and then select the day and ratio of the count columns.
在列日加入两个数据框,然后选择计数列的日期和比率。
total_req_per_day_df = logs_df.select(dayofmonth('time')
.alias('day')
).groupBy('day').count()
avg_daily_req_per_host_df = (
total_req_per_day_df.join(daily_hosts_df,
total_req_per_day_df.day == daily_hosts_df.day
)
.select(daily_hosts_df['day'],
(total_req_per_day_df['count']/daily_hosts_df['count'])
.alias('avg_reqs_per_host_per_day')
)
.cache()
)
#4
0
Solution, based on zero323 answer, but correctly works as OUTER join.
解决方案,基于zero323答案,但正确地作为OUTER加入。
avg_daily_req_per_host_df = (
total_req_per_day_df.join(
daily_hosts_df, daily_hosts_df['day'] == total_req_per_day_df['day'], 'outer'
).select(
total_req_per_day_df['day'],
(total_req_per_day_df['count']/daily_hosts_df['count']).alias('avg_reqs_per_host_per_day')
)
).cache()
Without 'outer' param you lost data for days missings in one of dataframes. This is not critical for PySpark Lab2 task, becouse both dataframes contains same dates. But can create some pain in another tasks :)
如果没有“外部”参数,您会丢失其中一个数据帧中的数据丢失数据。这对于PySpark Lab2任务并不重要,因为两个数据帧都包含相同的日期。但是可以在另一个任务中创造一些痛苦:)