I'm trying to use Spark dataframes instead of RDDs since they appear to be more high-level than RDDs and tend to produce more readable code, but I would be more than happy to get suggestions for something more idiomatic for the task at hand.
我正在尝试使用Spark dataframes而不是RDDs,因为它们看起来比RDDs更高级,并且倾向于生成更可读的代码,但是我非常乐意为手头的任务提供更惯用的建议。
In a 14-nodes Google Dataproc cluster, I have about 6 millions names that are translated to ids by two different systems: sa
and sb
. Each Row
contains name
, id_sa
and id_sb
. My goal is to produce a mapping from id_sa
to id_sb
such that for each id_sa
, the corresponding id_sb
is the most frequent id among all names attached to id_sa
.
在一个14个节点的谷歌Dataproc集群中,我有大约600万个名称,它们被两个不同的系统转换为ids: sa和sb。我的目标是生成一个从id_sa到id_sb的映射,对于每个id_sa,对应的id_sb是所有附加到id_sa的所有名称中最频繁的id。
Let's try to clarify with an example. If I have the following rows:
让我们用一个例子来阐明。如果我有以下几行:
[Row(name='n1', id_sa='a1', id_sb='b1'),
Row(name='n2', id_sa='a1', id_sb='b2'),
Row(name='n3', id_sa='a1', id_sb='b2'),
Row(name='n4', id_sa='a2', id_sb='b2')]
My goal is to produce a mapping from a1
to b2
. Indeed, the names associated to a1
are n1
, n2
and n3
, which map respectively to b1
, b2
and b2
, so b2
is the most frequent mapping in the names associated to a1
. In the same way, a2
will be mapped to b2
. It's OK to assume that there will always be a winner: no need to break ties.
我的目标是从a1到b2的映射。实际上,与a1相关的名称有n1、n2和n3,它们分别映射到b1、b2和b2,所以在与a1相关的名称中,b2是最常见的映射。同样,a2将被映射到b2。假设总有赢家是可以的:没有必要打破关系。
I was hoping that I could use groupBy(df.id_sa)
on my dataframe, but I don't know what to do next. I was hoping for an aggregation that could produce, in the end, the following rows:
我希望我可以在我的dataframe上使用groupBy(df.id_sa),但是我不知道接下来要做什么。我希望能有一个聚合,最终能产生以下几行:
[Row(id_sa=a1, max_id_sb=b2),
Row(id_sa=a2, max_id_sb=b2)]
But maybe I'm trying to use the wrong tool and I should just go back to using RDDs.
但是也许我用错了工具,我应该用RDDs。
2 个解决方案
#1
28
Using join
(it will result in more than one row in group in case of ties):
使用join(在有连接的情况下会导致组内多一行):
import pyspark.sql.functions as F
from pyspark.sql.functions import count, col
cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")
cnts.join(maxs,
(col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))
Using window functions (will drop ties):
使用窗口功能(会掉领带):
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())
(cnts
.withColumn("rn", row_number().over(w))
.where(col("rn") == 1)
.select("id_sa", "id_sb"))
Using struct
ordering:
使用结构顺序:
from pyspark.sql.functions import struct
(cnts
.groupBy("id_sa")
.agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
.select(col("id_sa"), col("max.id_sb")))
See also How to select the first row of each group?
请参见如何选择每个组的第一行?
#2
3
I think what you might be looking for are window functions: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window
我认为您可能正在寻找的是窗口函数:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
Here is an example in Scala (I don't have a Spark Shell with Hive available right now, so I was not able to test the code, but I think it should work):
这里有一个Scala的例子(我现在没有带有Hive的Spark Shell,所以我不能测试代码,但是我认为它应该可以工作):
case class MyRow(name: String, id_sa: String, id_sb: String)
val myDF = sc.parallelize(Array(
MyRow("n1", "a1", "b1"),
MyRow("n2", "a1", "b2"),
MyRow("n3", "a1", "b2"),
MyRow("n1", "a2", "b2")
)).toDF("name", "id_sa", "id_sb")
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc)
myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")
There are probably more efficient ways to achieve the same results with Window functions, but I hope this points you in the right direction.
也许有更有效的方法可以用窗口函数来实现相同的结果,但是我希望这将您引向正确的方向。
#1
28
Using join
(it will result in more than one row in group in case of ties):
使用join(在有连接的情况下会导致组内多一行):
import pyspark.sql.functions as F
from pyspark.sql.functions import count, col
cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")
cnts.join(maxs,
(col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))
Using window functions (will drop ties):
使用窗口功能(会掉领带):
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())
(cnts
.withColumn("rn", row_number().over(w))
.where(col("rn") == 1)
.select("id_sa", "id_sb"))
Using struct
ordering:
使用结构顺序:
from pyspark.sql.functions import struct
(cnts
.groupBy("id_sa")
.agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
.select(col("id_sa"), col("max.id_sb")))
See also How to select the first row of each group?
请参见如何选择每个组的第一行?
#2
3
I think what you might be looking for are window functions: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window
我认为您可能正在寻找的是窗口函数:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
Here is an example in Scala (I don't have a Spark Shell with Hive available right now, so I was not able to test the code, but I think it should work):
这里有一个Scala的例子(我现在没有带有Hive的Spark Shell,所以我不能测试代码,但是我认为它应该可以工作):
case class MyRow(name: String, id_sa: String, id_sb: String)
val myDF = sc.parallelize(Array(
MyRow("n1", "a1", "b1"),
MyRow("n2", "a1", "b2"),
MyRow("n3", "a1", "b2"),
MyRow("n1", "a2", "b2")
)).toDF("name", "id_sa", "id_sb")
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc)
myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")
There are probably more efficient ways to achieve the same results with Window functions, but I hope this points you in the right direction.
也许有更有效的方法可以用窗口函数来实现相同的结果,但是我希望这将您引向正确的方向。