如何获得与Spark RDD等价的SQL row_number ?

时间:2021-05-06 09:20:40

I need to generate a full list of row_numbers for a data table with many columns.

我需要为包含许多列的数据表生成完整的row_numbers列表。

In SQL, this would look like this:

在SQL中,这是这样的:

select
   key_value,
   col1,
   col2,
   col3,
   row_number() over (partition by key_value order by col1, col2 desc, col3)
from
   temp
;

Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like

现在,假设在Spark中有一个RDD (K, V),其中V=(col1, col2, col3)我的条目是这样的

(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.

I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number

我希望使用sortBy()、sortWith()、sortByKey()、zipWithIndex等命令对它们进行排序,并使用具有正确row_number的新RDD

(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.

(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)

(我不关心括号,所以表单也可以是(K, (col1,col2,col3,rownum))

How do I do this?

我该怎么做呢?

Here's my first attempt:

这是我第一次尝试:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)

// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)

// note that this isn't ordering with a partition on key value K!

val temp2 = temp1.???

Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array

还要注意,函数sortBy不能直接应用到RDD,但是必须首先运行collect(),然后输出也不是RDD,而是数组

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)

// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)

Here's a little more progress, but still not partitioned:

这里有一些进展,但仍然没有分区:

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))

temp2.collect().foreach(println)

// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)

2 个解决方案

#1


15  

The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.

row_number() / (partition by…按……订购)功能被添加到Spark 1.4。这个回答使用PySpark / DataFrames。

Create a test DataFrame:

创建一个测试DataFrame:

from pyspark.sql import Row, functions as F

testDF = sc.parallelize(
    (Row(k="key1", v=(1,2,3)),
     Row(k="key1", v=(1,4,7)),
     Row(k="key1", v=(2,2,3)),
     Row(k="key2", v=(5,5,5)),
     Row(k="key2", v=(5,5,9)),
     Row(k="key2", v=(7,5,5))
    )
).toDF()

Add the partitioned row number:

添加分区行号:

from pyspark.sql.window import Window

(testDF
 .select("k", "v",
         F.rowNumber()
         .over(Window
               .partitionBy("k")
               .orderBy("k")
              )
         .alias("rowNum")
        )
 .show()
)

+----+-------+------+
|   k|      v|rowNum|
+----+-------+------+
|key1|[1,2,3]|     1|
|key1|[1,4,7]|     2|
|key1|[2,2,3]|     3|
|key2|[5,5,5]|     1|
|key2|[5,5,9]|     2|
|key2|[7,5,5]|     3|
+----+-------+------+

#2


4  

This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.

这是你提出的一个有趣的问题。我将在Python中回答它,但我相信您将能够无缝地转换到Scala。

Here is how I would tackle it:

以下是我的应对方法:

1- Simplify your data:

1 -简化你的数据:

temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))

temp2 is now a "real" key-value pair. It looks like that:

temp2现在是一个“真正的”键值对。它看起来像这样:

[
((3, 4), (5, 5, 5)),  
((3, 4), (5, 5, 9)),   
((3, 4), (7, 5, 5)),   
((1, 2), (1, 2, 3)),  
((1, 2), (1, 4, 7)),   
((1, 2), (2, 2, 3))

]

]

2- Then, use the group-by function to reproduce the effect of the PARTITION BY:

2-然后,使用group-by函数来复制分区的效果:

temp3 = temp2.groupByKey()

temp3 is now a RDD with 2 rows:

temp3现在是一个RDD,有2行:

[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),  
 ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]

3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):

3-现在,您需要为RDD的每个值应用一个rank函数。在python中,我会使用简单的排序函数(enumerate将创建row_number列):

 temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)

Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:

请注意,为了实现您的特定顺序,您需要提供正确的“关键”参数(在python中,我只创建一个lambda函数:

lambda tuple : (tuple[0],-tuple[1],tuple[2])

At the end (without the key argument function, it looks like that):

最后(没有键参数函数,是这样的):

[
((1, 2), ((1, 2, 3), 0)), 
((1, 2), ((1, 4, 7), 1)), 
((1, 2), ((2, 2, 3), 2)), 
((3, 4), ((5, 5, 5), 0)), 
((3, 4), ((5, 5, 9), 1)), 
((3, 4), ((7, 5, 5), 2))

]

]

Hope that helps!

希望会有帮助!

Good luck.

祝你好运。

#1


15  

The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.

row_number() / (partition by…按……订购)功能被添加到Spark 1.4。这个回答使用PySpark / DataFrames。

Create a test DataFrame:

创建一个测试DataFrame:

from pyspark.sql import Row, functions as F

testDF = sc.parallelize(
    (Row(k="key1", v=(1,2,3)),
     Row(k="key1", v=(1,4,7)),
     Row(k="key1", v=(2,2,3)),
     Row(k="key2", v=(5,5,5)),
     Row(k="key2", v=(5,5,9)),
     Row(k="key2", v=(7,5,5))
    )
).toDF()

Add the partitioned row number:

添加分区行号:

from pyspark.sql.window import Window

(testDF
 .select("k", "v",
         F.rowNumber()
         .over(Window
               .partitionBy("k")
               .orderBy("k")
              )
         .alias("rowNum")
        )
 .show()
)

+----+-------+------+
|   k|      v|rowNum|
+----+-------+------+
|key1|[1,2,3]|     1|
|key1|[1,4,7]|     2|
|key1|[2,2,3]|     3|
|key2|[5,5,5]|     1|
|key2|[5,5,9]|     2|
|key2|[7,5,5]|     3|
+----+-------+------+

#2


4  

This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.

这是你提出的一个有趣的问题。我将在Python中回答它,但我相信您将能够无缝地转换到Scala。

Here is how I would tackle it:

以下是我的应对方法:

1- Simplify your data:

1 -简化你的数据:

temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))

temp2 is now a "real" key-value pair. It looks like that:

temp2现在是一个“真正的”键值对。它看起来像这样:

[
((3, 4), (5, 5, 5)),  
((3, 4), (5, 5, 9)),   
((3, 4), (7, 5, 5)),   
((1, 2), (1, 2, 3)),  
((1, 2), (1, 4, 7)),   
((1, 2), (2, 2, 3))

]

]

2- Then, use the group-by function to reproduce the effect of the PARTITION BY:

2-然后,使用group-by函数来复制分区的效果:

temp3 = temp2.groupByKey()

temp3 is now a RDD with 2 rows:

temp3现在是一个RDD,有2行:

[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),  
 ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]

3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):

3-现在,您需要为RDD的每个值应用一个rank函数。在python中,我会使用简单的排序函数(enumerate将创建row_number列):

 temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)

Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:

请注意,为了实现您的特定顺序,您需要提供正确的“关键”参数(在python中,我只创建一个lambda函数:

lambda tuple : (tuple[0],-tuple[1],tuple[2])

At the end (without the key argument function, it looks like that):

最后(没有键参数函数,是这样的):

[
((1, 2), ((1, 2, 3), 0)), 
((1, 2), ((1, 4, 7), 1)), 
((1, 2), ((2, 2, 3), 2)), 
((3, 4), ((5, 5, 5), 0)), 
((3, 4), ((5, 5, 9), 1)), 
((3, 4), ((7, 5, 5), 2))

]

]

Hope that helps!

希望会有帮助!

Good luck.

祝你好运。