I am trying to get the top n items in a dataset .
我试图获取数据集中的前n项。
Initially I did this.
最初我这样做了。
var df = Seq( (1 , "row1") , (2,"row2"), (1,"row11") , (1 , null) ).toDF()
df=df.select($"_1".alias("p_int"), $"_2".alias("p_string"))
val resultDf =df.where($"p_string".isNotNull).select( $"p_int" ,$"p_int" +1 , upper($"p_string") , rank().over(Window.partitionBy($"p_int").orderBy( $"p_string" )) as "RANKINDEX", row_number().over(Window.partitionBy($"p_int").orderBy( $"p_string" )) as "ROWNUMBER" ).where($"ROWNUMBER" <= 2 )
But I want to avoid the performance cost of the operation "where($"ROWNUMBER" <= 10 )"
但我想避免操作的性能成本“where($”ROWNUMBER“<= 10)”
So I decided to do the following
所以我决定做以下事情
var df = Seq( (1 , "row1") , (2,"row2"), (1,"row11") , (1 , null) ).toDF()
df=df.select($"_1".alias("p_int"), $"_2".alias("p_string"))
val test =df.where($"p_string".isNotNull).select( $"p_int" ,$"p_int" +1 , upper($"p_string") , rank().over(Window.partitionBy($"p_int").orderBy( $"p_string" )) as "RANKINDEX", row_number().over(Window.partitionBy($"p_int").orderBy( $"p_string" )) as "ROWNUMBER" )
implicit val encoder = RowEncoder(test.schema)
var temp =test.mapPartitions( _.take(2))
However , my testing seems to show that this will not produce the correct output .
但是,我的测试似乎表明这不会产生正确的输出。
Any thoughts why . Wouldn't the take function on the iterator obtained from the window dataset get the first n elements in the iterator?
任何想法为什么。从窗口数据集获取的迭代器上的take函数不会获得迭代器中的前n个元素吗?
1 个解决方案
#1
0
Partitions of the Dataset
don't have have one-to-one correspondence with PARTITION BY
clause. All the magic in OVER (PARTITION BY ...)
happens on the much lower level and a single physical partition will process multiple ids.
数据集的分区与PARTITION BY子句没有一对一的对应关系。 OVER(PARTITION BY ...)中的所有魔法都发生在更低的级别上,单个物理分区将处理多个ID。
Also you don't really save the work. To correctly generates row_numbers
Spark will have to shuffle, sort and scan all the data. You'll need much lower level mechanisms to avoid full shuffle and sort (for example Aggregator
with binary heap).
你也没有真正保存工作。要正确生成row_numbers,Spark必须对所有数据进行随机排序,排序和扫描。您需要更低级别的机制来避免完全混乱和排序(例如,带有二进制堆的聚合器)。
#1
0
Partitions of the Dataset
don't have have one-to-one correspondence with PARTITION BY
clause. All the magic in OVER (PARTITION BY ...)
happens on the much lower level and a single physical partition will process multiple ids.
数据集的分区与PARTITION BY子句没有一对一的对应关系。 OVER(PARTITION BY ...)中的所有魔法都发生在更低的级别上,单个物理分区将处理多个ID。
Also you don't really save the work. To correctly generates row_numbers
Spark will have to shuffle, sort and scan all the data. You'll need much lower level mechanisms to avoid full shuffle and sort (for example Aggregator
with binary heap).
你也没有真正保存工作。要正确生成row_numbers,Spark必须对所有数据进行随机排序,排序和扫描。您需要更低级别的机制来避免完全混乱和排序(例如,带有二进制堆的聚合器)。