在Spark DataFrame中找到每个组的最大行

时间:2021-10-22 04:28:57

I'm trying to use Spark dataframes instead of RDDs since they appear to be more high-level than RDDs and tend to produce more readable code, but I would be more than happy to get suggestions for something more idiomatic for the task at hand.

我正在尝试使用Spark dataframes而不是RDDs,因为它们看起来比RDDs更高级,并且倾向于生成更可读的代码,但是我非常乐意为手头的任务提供更惯用的建议。

In a 14-nodes Google Dataproc cluster, I have about 6 millions names that are translated to ids by two different systems: sa and sb. Each Row contains name, id_sa and id_sb. My goal is to produce a mapping from id_sa to id_sb such that for each id_sa, the corresponding id_sb is the most frequent id among all names attached to id_sa.

在一个14个节点的谷歌Dataproc集群中,我有大约600万个名称,它们被两个不同的系统转换为ids: sa和sb。我的目标是生成一个从id_sa到id_sb的映射,对于每个id_sa,对应的id_sb是所有附加到id_sa的所有名称中最频繁的id。

Let's try to clarify with an example. If I have the following rows:

让我们用一个例子来阐明。如果我有以下几行:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]

My goal is to produce a mapping from a1 to b2. Indeed, the names associated to a1 are n1, n2 and n3, which map respectively to b1, b2 and b2, so b2 is the most frequent mapping in the names associated to a1. In the same way, a2 will be mapped to b2. It's OK to assume that there will always be a winner: no need to break ties.

我的目标是从a1到b2的映射。实际上,与a1相关的名称有n1、n2和n3,它们分别映射到b1、b2和b2,所以在与a1相关的名称中,b2是最常见的映射。同样,a2将被映射到b2。假设总有赢家是可以的:没有必要打破关系。

I was hoping that I could use groupBy(df.id_sa) on my dataframe, but I don't know what to do next. I was hoping for an aggregation that could produce, in the end, the following rows:

我希望我可以在我的dataframe上使用groupBy(df.id_sa),但是我不知道接下来要做什么。我希望能有一个聚合,最终能产生以下几行:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]

But maybe I'm trying to use the wrong tool and I should just go back to using RDDs.

但是也许我用错了工具,我应该用RDDs。

2 个解决方案

#1


28  

Using join (it will result in more than one row in group in case of ties):

使用join(在有连接的情况下会导致组内多一行):

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col 

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")

cnts.join(maxs, 
  (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))

Using window functions (will drop ties):

使用窗口功能(会掉领带):

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())

(cnts
  .withColumn("rn", row_number().over(w))
  .where(col("rn") == 1)
  .select("id_sa", "id_sb"))

Using struct ordering:

使用结构顺序:

from pyspark.sql.functions import struct

(cnts
  .groupBy("id_sa")
  .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
  .select(col("id_sa"), col("max.id_sb")))

See also How to select the first row of each group?

请参见如何选择每个组的第一行?

#2


3  

I think what you might be looking for are window functions: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window

我认为您可能正在寻找的是窗口函数:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Here is an example in Scala (I don't have a Spark Shell with Hive available right now, so I was not able to test the code, but I think it should work):

这里有一个Scala的例子(我现在没有带有Hive的Spark Shell,所以我不能测试代码,但是我认为它应该可以工作):

case class MyRow(name: String, id_sa: String, id_sb: String)

val myDF = sc.parallelize(Array(
    MyRow("n1", "a1", "b1"),
    MyRow("n2", "a1", "b2"),
    MyRow("n3", "a1", "b2"),
    MyRow("n1", "a2", "b2")
)).toDF("name", "id_sa", "id_sb")

import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc)

myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")

There are probably more efficient ways to achieve the same results with Window functions, but I hope this points you in the right direction.

也许有更有效的方法可以用窗口函数来实现相同的结果,但是我希望这将您引向正确的方向。

#1


28  

Using join (it will result in more than one row in group in case of ties):

使用join(在有连接的情况下会导致组内多一行):

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col 

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")

cnts.join(maxs, 
  (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))

Using window functions (will drop ties):

使用窗口功能(会掉领带):

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())

(cnts
  .withColumn("rn", row_number().over(w))
  .where(col("rn") == 1)
  .select("id_sa", "id_sb"))

Using struct ordering:

使用结构顺序:

from pyspark.sql.functions import struct

(cnts
  .groupBy("id_sa")
  .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
  .select(col("id_sa"), col("max.id_sb")))

See also How to select the first row of each group?

请参见如何选择每个组的第一行?

#2


3  

I think what you might be looking for are window functions: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window

我认为您可能正在寻找的是窗口函数:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Here is an example in Scala (I don't have a Spark Shell with Hive available right now, so I was not able to test the code, but I think it should work):

这里有一个Scala的例子(我现在没有带有Hive的Spark Shell,所以我不能测试代码,但是我认为它应该可以工作):

case class MyRow(name: String, id_sa: String, id_sb: String)

val myDF = sc.parallelize(Array(
    MyRow("n1", "a1", "b1"),
    MyRow("n2", "a1", "b2"),
    MyRow("n3", "a1", "b2"),
    MyRow("n1", "a2", "b2")
)).toDF("name", "id_sa", "id_sb")

import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc)

myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")

There are probably more efficient ways to achieve the same results with Window functions, but I hope this points you in the right direction.

也许有更有效的方法可以用窗口函数来实现相同的结果,但是我希望这将您引向正确的方向。