I have a dataframe which has a few attributes (C1 to C2), an offset (in days) and a few values (V1, V2).
我有一个数据框,它有一些属性(C1到C2),一个偏移量(以天为单位)和一些值(V1,V2)。
val inputDF= spark.sparkContext.parallelize(Seq((1,2,30, 100, -1),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10).toDF("c1", "c2", "v1", "v2", "offset")
inputDF: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 3 more fields]
scala> inputDF.show
+---+---+---+---+------+
| c1| c2| v1| v2|offset|
+---+---+---+---+------+
| 1| 2| 30|100| -1|
| 1| 2| 30|100| 0|
| 1| 2| 30|100| 1|
| 11| 21| 30|100| -1|
| 11| 21| 30|100| 0|
| 11| 21| 30|100| 1|
+---+---+---+---+------+
What I need to do is, calculate the cumulative sum for V1, V2 for (c1,c2) across offset.
我需要做的是,计算跨越偏移的(c1,c2)的V1,V2的累积和。
I tried this but that's far away from a generic solution that could work on any data frame.
我尝试过这个,但这远远不是可以在任何数据框架上工作的通用解决方案。
import org.apache.spark.sql.expressions.Window
val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))
val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)
val outputDF = inputDF
.withColumn("cumulative_v1", sum(inputDF("v1")).over(w))
.withColumn("cumulative_v2", sum(inputDF("v2")).over(w))
+---+---+---+---+------+----------------------------
| c1| c2| v1| v2|offset|cumulative_v1| cumulative_v2|
+---+---+---+---+------+-------------|--------------|
| 1| 2| 30|100| -1|30 | 100 |
| 1| 2| 30|100| 0|60 | 200 |
| 1| 2| 30|100| 1|90 | 300 |
| 11| 21| 30|100| -1|30 | 100 |
| 11| 21| 30|100| 0|60 | 200 |
| 11| 21| 30|100| 1|90 | 300 |
+---+---+---+---+------+-----------------------------
The challenge is [a] I need to do this across multiple and varying offset windows (-1 to 1), (-10 to 10), (-30 to 30) or any others [b] I need to use this function across multiple dataframes/ datasets, so I'm hoping for a generic function that could either work in RDD/ Dataset.
挑战是[a]我需要在多个和变化的偏移窗口(-1到1),( - 10到10),( - 30到30)或任何其他窗口中执行此操作[b]我需要跨越多个使用此功能多个数据帧/数据集,所以我希望一个可以在RDD / Dataset中工作的通用函数。
Any thoughts on how I could achieve this in Spark 2.0?
有关如何在Spark 2.0中实现这一点的任何想法?
Help is much appreciated. Thanks!
非常感谢帮助。谢谢!
2 个解决方案
#1
0
Here's a primitive take using just data frames.
这是使用数据帧的原始方法。
import org.apache.spark.sql.expressions.Window
val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))
val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)
val inputDF= spark
.sparkContext
.parallelize(Seq((1,2,30, 100, -1),(1,2,3, 100, -2),(1,2,140, 100, 2),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10)
.toDF("c1", "c2", "v1", "v2", "offset")
val outputDF = inputDF
.withColumn("cumulative_v1", sum(when($"offset".between(-1, 1), inputDF("v1")).otherwise(0)).over(w))
.withColumn("cumulative_v3", sum(when($"offset".between(-2, 2), inputDF("v1")).otherwise(0)).over(w))
.withColumn("cumulative_v2", sum(inputDF("v2")).over(w))
This produces a cumulative sum over a single 'value' for different windows.
这会在不同窗口的单个“值”上产生累积和。
scala> outputDF.show
+---+---+---+---+------+-------------+-------------+-------------+
| c1| c2| v1| v2|offset|cumulative_v1|cumulative_v3|cumulative_v2|
+---+---+---+---+------+-------------+-------------+-------------+
| 1| 2| 3|100| -2| 0| 0| 100|
| 1| 2| 30|100| -1| 30| 30| 200|
| 1| 2| 30|100| 0| 60| 60| 300|
| 1| 2| 30|100| 1| 90| 90| 400|
| 1| 2|140|100| 2| 90| 90| 500|
| 11| 21| 30|100| -1| 30| 30| 100|
| 11| 21| 30|100| 0| 60| 60| 200|
| 11| 21| 30|100| 1| 90| 90| 300|
+---+---+---+---+------+-------------+-------------+-------------+
A couple of drawbacks of this approach - [1] for each conditional window (-1,1), (-2,2) or any (from_offset, to_offset), sum() needs to be called separately. [2] this isn't a generic function.
这种方法的一些缺点 - [1]对于每个条件窗口(-1,1),( - 2,2)或任何(from_offset,to_offset),sum()需要单独调用。 [2]这不是一般功能。
I know spark accepts a variable list of columns for aggregate functions like this -
我知道spark接受像这样的聚合函数的列的变量列表 -
val exprs = Map("v1" -> "sum", "v2" -> "sum")
But I'm unsure of how to extend this for window functions with variable conditions. I'm still very curious to know if there is a better and modular/ reusable function that we can write to solve this.
但我不确定如何为具有可变条件的窗口函数扩展它。我仍然很想知道是否有更好的模块化/可重用功能,我们可以编写来解决这个问题。
#2
0
Another generic way to solve this would be with a foldLeft as explained here - https://*.com/a/44532867/7059145
解决这个问题的另一种通用方法是使用foldLeft,如下所述 - https://*.com/a/44532867/7059145
#1
0
Here's a primitive take using just data frames.
这是使用数据帧的原始方法。
import org.apache.spark.sql.expressions.Window
val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))
val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)
val inputDF= spark
.sparkContext
.parallelize(Seq((1,2,30, 100, -1),(1,2,3, 100, -2),(1,2,140, 100, 2),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10)
.toDF("c1", "c2", "v1", "v2", "offset")
val outputDF = inputDF
.withColumn("cumulative_v1", sum(when($"offset".between(-1, 1), inputDF("v1")).otherwise(0)).over(w))
.withColumn("cumulative_v3", sum(when($"offset".between(-2, 2), inputDF("v1")).otherwise(0)).over(w))
.withColumn("cumulative_v2", sum(inputDF("v2")).over(w))
This produces a cumulative sum over a single 'value' for different windows.
这会在不同窗口的单个“值”上产生累积和。
scala> outputDF.show
+---+---+---+---+------+-------------+-------------+-------------+
| c1| c2| v1| v2|offset|cumulative_v1|cumulative_v3|cumulative_v2|
+---+---+---+---+------+-------------+-------------+-------------+
| 1| 2| 3|100| -2| 0| 0| 100|
| 1| 2| 30|100| -1| 30| 30| 200|
| 1| 2| 30|100| 0| 60| 60| 300|
| 1| 2| 30|100| 1| 90| 90| 400|
| 1| 2|140|100| 2| 90| 90| 500|
| 11| 21| 30|100| -1| 30| 30| 100|
| 11| 21| 30|100| 0| 60| 60| 200|
| 11| 21| 30|100| 1| 90| 90| 300|
+---+---+---+---+------+-------------+-------------+-------------+
A couple of drawbacks of this approach - [1] for each conditional window (-1,1), (-2,2) or any (from_offset, to_offset), sum() needs to be called separately. [2] this isn't a generic function.
这种方法的一些缺点 - [1]对于每个条件窗口(-1,1),( - 2,2)或任何(from_offset,to_offset),sum()需要单独调用。 [2]这不是一般功能。
I know spark accepts a variable list of columns for aggregate functions like this -
我知道spark接受像这样的聚合函数的列的变量列表 -
val exprs = Map("v1" -> "sum", "v2" -> "sum")
But I'm unsure of how to extend this for window functions with variable conditions. I'm still very curious to know if there is a better and modular/ reusable function that we can write to solve this.
但我不确定如何为具有可变条件的窗口函数扩展它。我仍然很想知道是否有更好的模块化/可重用功能,我们可以编写来解决这个问题。
#2
0
Another generic way to solve this would be with a foldLeft as explained here - https://*.com/a/44532867/7059145
解决这个问题的另一种通用方法是使用foldLeft,如下所述 - https://*.com/a/44532867/7059145