转换Spark数据集 - 按ID计算和合并多行

时间:2023-02-04 23:11:08

After some data processing, I end up with this Dataset:

经过一些数据处理后,我最终得到了这个数据集:

Dataset<Row> counts //ID,COUNT,DAY_OF_WEEK

Now I want to transform this to this format and save as CSV:

现在我想将其转换为此格式并另存为CSV:

ID,COUNT_DoW1, ID,COUNT_DoW2, ID,COUNT_DoW3,..ID,COUNT_DoW7

I can think of one approach of:

我可以想到一种方法:

JavaPairRDD<Long, Map<Integer, Integer>> r = counts.toJavaRDD().mapToPair(...)
JavaPairRDD<Long, Map<Integer, Integer>> merged = r.reduceByKey(...);

Where its a pair of "ID" and List of size 7. After I get JavaPairRDD, I can store it in csv. Is there a simpler approach for this transformation without converting it to an RDD?

其中一对“ID”和大小为List的列表。在我获得JavaPairRDD之后,我可以将它存储在csv中。有没有更简单的方法来进行转换而不将其转换为RDD?

1 个解决方案

#1


0  

You can use the struct function to construct a pair from cnt and day and then do a groupby with collect_list. Something like this (scala but you can easily convert to java):

您可以使用struct函数从cnt和day构造一对,然后使用collect_list执行groupby。像这样的东西(scala,但你可以很容易地转换为java):

df.groupBy("ID").agg(collect_list(struct("COUNT","DAY")))

Now you can write a UDF which extracts the relevant column. So you simply do a withColumn in a loop to simply copy the ID (df.withColumn("id2",col("id")))

现在您可以编写一个提取相关列的UDF。因此,您只需在循环中执行withColumn即可复制ID(df.withColumn(“id2”,col(“id”)))

then you create a UDF which extracts the count element from position i and run it on all columns and lastly the same on day.

然后你创建一个UDF,它从位置i中提取count元素并在所有列上运行它,最后在同一天运行它。

If you keep the order you want and drop irrelevant columns you would get what you asked for.

如果您保留所需的订单并删除不相关的列,您将得到您所要求的。

You can also work with the pivot command (again in scala but you should be able to easily convert to java):

您也可以使用pivot命令(再次在scala中,但您应该能够轻松转换为java):

df.show()
>>+---+---+---+
>>| id|cnt|day|
>>+---+---+---+
>>|333| 31|  1|
>>|333| 32|  2|
>>|333|133|  3|
>>|333| 34|  4|
>>|333| 35|  5|
>>|333| 36|  6|
>>|333| 37|  7|
>>|222| 41|  4|
>>|111| 11|  1|
>>|111| 22|  2|
>>|111| 33|  3|
>>|111| 44|  4|
>>|111| 55|  5|
>>|111| 66|  6|
>>|111| 77|  7|
>>|222| 21|  1|
>>+---+---+---+

val df2 =  df.withColumn("all",struct('id, 'cnt' 'day))

val res = .groupBy("id").pivot("day").agg(first('all).as("bla")).select("1.*","2.*","3.*", "4.*", "5.*", "6.*", "7.*")

res.show()
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+
>>| id|cnt|day|  id| cnt| day|  id| cnt| day| id|cnt|day|  id| cnt| day|  id| cnt| day|  id| cnt| day|
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+
>>|333| 31|  1| 333|  32|   2| 333| 133|   3|333| 34|  4| 333|  35|   5| 333|  36|   6| 333|  37|   7|
>>|222| 21|  1|null|null|null|null|null|null|222| 41|  4|null|null|null|null|null|null|null|null|null|
>>|111| 11|  1| 111|  22|   2| 111|  33|   3|111| 44|  4| 111|  55|   5| 111|  66|   6| 111|  77|   7|
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+

#1


0  

You can use the struct function to construct a pair from cnt and day and then do a groupby with collect_list. Something like this (scala but you can easily convert to java):

您可以使用struct函数从cnt和day构造一对,然后使用collect_list执行groupby。像这样的东西(scala,但你可以很容易地转换为java):

df.groupBy("ID").agg(collect_list(struct("COUNT","DAY")))

Now you can write a UDF which extracts the relevant column. So you simply do a withColumn in a loop to simply copy the ID (df.withColumn("id2",col("id")))

现在您可以编写一个提取相关列的UDF。因此,您只需在循环中执行withColumn即可复制ID(df.withColumn(“id2”,col(“id”)))

then you create a UDF which extracts the count element from position i and run it on all columns and lastly the same on day.

然后你创建一个UDF,它从位置i中提取count元素并在所有列上运行它,最后在同一天运行它。

If you keep the order you want and drop irrelevant columns you would get what you asked for.

如果您保留所需的订单并删除不相关的列,您将得到您所要求的。

You can also work with the pivot command (again in scala but you should be able to easily convert to java):

您也可以使用pivot命令(再次在scala中,但您应该能够轻松转换为java):

df.show()
>>+---+---+---+
>>| id|cnt|day|
>>+---+---+---+
>>|333| 31|  1|
>>|333| 32|  2|
>>|333|133|  3|
>>|333| 34|  4|
>>|333| 35|  5|
>>|333| 36|  6|
>>|333| 37|  7|
>>|222| 41|  4|
>>|111| 11|  1|
>>|111| 22|  2|
>>|111| 33|  3|
>>|111| 44|  4|
>>|111| 55|  5|
>>|111| 66|  6|
>>|111| 77|  7|
>>|222| 21|  1|
>>+---+---+---+

val df2 =  df.withColumn("all",struct('id, 'cnt' 'day))

val res = .groupBy("id").pivot("day").agg(first('all).as("bla")).select("1.*","2.*","3.*", "4.*", "5.*", "6.*", "7.*")

res.show()
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+
>>| id|cnt|day|  id| cnt| day|  id| cnt| day| id|cnt|day|  id| cnt| day|  id| cnt| day|  id| cnt| day|
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+
>>|333| 31|  1| 333|  32|   2| 333| 133|   3|333| 34|  4| 333|  35|   5| 333|  36|   6| 333|  37|   7|
>>|222| 21|  1|null|null|null|null|null|null|222| 41|  4|null|null|null|null|null|null|null|null|null|
>>|111| 11|  1| 111|  22|   2| 111|  33|   3|111| 44|  4| 111|  55|   5| 111|  66|   6| 111|  77|   7|
>>+---+---+---+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+