转换Spark数据集 - 按ID计算和合并多行

时间:2023-02-04 23:11:08

After some data processing, I end up with this Dataset:


Dataset<Row> counts //ID,COUNT,DAY_OF_WEEK

Now I want to transform this to this format and save as CSV:



I can think of one approach of:


JavaPairRDD<Long, Map<Integer, Integer>> r = counts.toJavaRDD().mapToPair(...)
JavaPairRDD<Long, Map<Integer, Integer>> merged = r.reduceByKey(...);

Where its a pair of "ID" and List of size 7. After I get JavaPairRDD, I can store it in csv. Is there a simpler approach for this transformation without converting it to an RDD?


1 个解决方案



You can use the struct function to construct a pair from cnt and day and then do a groupby with collect_list. Something like this (scala but you can easily convert to java):



Now you can write a UDF which extracts the relevant column. So you simply do a withColumn in a loop to simply copy the ID (df.withColumn("id2",col("id")))


then you create a UDF which extracts the count element from position i and run it on all columns and lastly the same on day.


If you keep the order you want and drop irrelevant columns you would get what you asked for.


You can also work with the pivot command (again in scala but you should be able to easily convert to java):


>>| id|cnt|day|
>>|333| 31|  1|
>>|333| 32|  2|
>>|333|133|  3|
>>|333| 34|  4|
>>|333| 35|  5|
>>|333| 36|  6|
>>|333| 37|  7|
>>|222| 41|  4|
>>|111| 11|  1|
>>|111| 22|  2|
>>|111| 33|  3|
>>|111| 44|  4|
>>|111| 55|  5|
>>|111| 66|  6|
>>|111| 77|  7|
>>|222| 21|  1|

val df2 =  df.withColumn("all",struct('id, 'cnt' 'day))

val res = .groupBy("id").pivot("day").agg(first('all).as("bla")).select("1.*","2.*","3.*", "4.*", "5.*", "6.*", "7.*")

>>| id|cnt|day|  id| cnt| day|  id| cnt| day| id|cnt|day|  id| cnt| day|  id| cnt| day|  id| cnt| day|
>>|333| 31|  1| 333|  32|   2| 333| 133|   3|333| 34|  4| 333|  35|   5| 333|  36|   6| 333|  37|   7|
>>|222| 21|  1|null|null|null|null|null|null|222| 41|  4|null|null|null|null|null|null|null|null|null|
>>|111| 11|  1| 111|  22|   2| 111|  33|   3|111| 44|  4| 111|  55|   5| 111|  66|   6| 111|  77|   7|



You can use the struct function to construct a pair from cnt and day and then do a groupby with collect_list. Something like this (scala but you can easily convert to java):



Now you can write a UDF which extracts the relevant column. So you simply do a withColumn in a loop to simply copy the ID (df.withColumn("id2",col("id")))


then you create a UDF which extracts the count element from position i and run it on all columns and lastly the same on day.


If you keep the order you want and drop irrelevant columns you would get what you asked for.


You can also work with the pivot command (again in scala but you should be able to easily convert to java):


>>| id|cnt|day|
>>|333| 31|  1|
>>|333| 32|  2|
>>|333|133|  3|
>>|333| 34|  4|
>>|333| 35|  5|
>>|333| 36|  6|
>>|333| 37|  7|
>>|222| 41|  4|
>>|111| 11|  1|
>>|111| 22|  2|
>>|111| 33|  3|
>>|111| 44|  4|
>>|111| 55|  5|
>>|111| 66|  6|
>>|111| 77|  7|
>>|222| 21|  1|

val df2 =  df.withColumn("all",struct('id, 'cnt' 'day))

val res = .groupBy("id").pivot("day").agg(first('all).as("bla")).select("1.*","2.*","3.*", "4.*", "5.*", "6.*", "7.*")

>>| id|cnt|day|  id| cnt| day|  id| cnt| day| id|cnt|day|  id| cnt| day|  id| cnt| day|  id| cnt| day|
>>|333| 31|  1| 333|  32|   2| 333| 133|   3|333| 34|  4| 333|  35|   5| 333|  36|   6| 333|  37|   7|
>>|222| 21|  1|null|null|null|null|null|null|222| 41|  4|null|null|null|null|null|null|null|null|null|
>>|111| 11|  1| 111|  22|   2| 111|  33|   3|111| 44|  4| 111|  55|   5| 111|  66|   6| 111|  77|   7|