I am new in Spark/Scala world, and I have a question regarding data selection from dataframes. I have a table with the following data, and I need to choose for each cust and user_id pair, all the last modified records, with max modify_time:
我是Spark / Scala世界的新手,我对数据框中的数据选择有疑问。我有一个包含以下数据的表,我需要为每个cust和user_id对选择所有最后修改的记录,并使用max modify_time:
Original data frame:
原始数据框:
+--------+----------+------------+--------------------+
| cust | user_id | another_id | modify_time |
+--------+----------+------------+--------------------+
| cust1| 1 | 222|2017-03-22 07:29 |
| cust1| 1 | 111|2017-03-22 07:29 |
| cust2| 2 | 111|2017-03-21 07:29 |
| cust1| 1 | 333|2017-03-21 07:29 |
| cust2| 2 | 444|2017-03-22 07:29 |
| cust2| 2 | 333|2017-03-22 07:29 |
+--------+----------+------------+--------------------+
The required result:
所需结果:
+--------+----------+------------+--------------------+
| cust | user_id | another_id | modify_time |
+--------+----------+------------+--------------------+
| cust1| 1 | 222|2017-03-22 07:29 |
| cust1| 1 | 111|2017-03-22 07:29 |
| cust2| 2 | 444|2017-03-22 07:29 |
| cust2| 2 | 333|2017-03-22 07:29 |
+--------+----------+------------+--------------------+
What is the most efficient way to do so?
最有效的方法是什么?
1 个解决方案
#1
0
I did the following and it gave me the expected result:
我做了以下,它给了我预期的结果:
val custUserModifyTime = df
.groupBy($"cust", $"user_id").agg(max($"modify_time")).collect()
val mostRecent: Seq[DataFrame] = custUserModifyTime.map(x => df.select("*")
.where("cust = '" + x.getAs[String]("cust") + "'" +
" AND user_id = '" + x.getAs[String]("user_id") + "'" +
" AND modify_time = '" + x.getAs[Timestamp]("max(modify_time)") + "'"))
val unifiedMostUpdatedData = mostRecent.reduce((a, b) => a.union(b))
#1
0
I did the following and it gave me the expected result:
我做了以下,它给了我预期的结果:
val custUserModifyTime = df
.groupBy($"cust", $"user_id").agg(max($"modify_time")).collect()
val mostRecent: Seq[DataFrame] = custUserModifyTime.map(x => df.select("*")
.where("cust = '" + x.getAs[String]("cust") + "'" +
" AND user_id = '" + x.getAs[String]("user_id") + "'" +
" AND modify_time = '" + x.getAs[Timestamp]("max(modify_time)") + "'"))
val unifiedMostUpdatedData = mostRecent.reduce((a, b) => a.union(b))