This is probably easiest to explain through example. Suppose I have a DataFrame of user logins to a website, for instance:
这可能是最容易通过示例解释的。假设我有一个用户登录网站的DataFrame,例如:
scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
I would like to add to this a column indicating when they became an active user on the site. But there is one caveat: there is a time period during which a user is considered active, and after this period, if they log in again, their became_active
date resets. Suppose this period is 5 days. Then the desired table derived from the above table would be something like this:
我想在此列添加一个列,指示他们何时成为网站上的活跃用户。但有一点需要注意:有一段时间用户被认为是活动的,在这段时间之后,如果他们再次登录,他们的become_active日期将重置。假设这段时间是5天。然后从上表派生的所需表将是这样的:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
So, in particular, SirChillingtonIV's became_active
date was reset because their second login came after the active period expired, but Booooooo99900098's became_active
date was not reset the second time he/she logged in, because it fell within the active period.
因此,特别是SirChillingtonIV的重置日期被重置,因为他们的第二次登录是在活动期限到期之后,但是Booooooo99900098的become_active日期在他/她登录的第二次没有重置,因为它落在活动期间。
My initial thought was to use window functions with lag
, and then using the lag
ged values to fill the became_active
column; for instance, something starting roughly like:
我最初的想法是使用滞后的窗口函数,然后使用滞后值填充become_active列;例如,大致类似于:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
Then, the rule to fill in the became_active
date would be, if tmp
is null
(i.e., if it's the first ever login) or if login_date - tmp >= 5
then became_active = login_date
; otherwise, go to the next most recent value in tmp
and apply the same rule. This suggests a recursive approach, which I'm having trouble imagining a way to implement.
然后,如果tmp为null(即,如果它是第一次登录)或者如果login_date - tmp> = 5则填充become_active日期的规则则为become_active = login_date;否则,转到tmp中的下一个最新值并应用相同的规则。这表明了一种递归方法,我无法想象一种实现方法。
My questions: Is this a viable approach, and if so, how can I "go back" and look at earlier values of tmp
until I find one where I stop? I can't, to my knowledge, iterate through values of a Spark SQL Column
. Is there another way to achieve this result?
我的问题:这是一种可行的方法,如果是这样的话,我怎么能“回去”看看tmp的早期值,直到我找到一个停止的地方?据我所知,我无法迭代Spark SQL列的值。还有另一种方法来实现这个结果吗?
1 个解决方案
#1
23
Here is the trick. Import a bunch of functions:
这是诀窍。导入一堆功能:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
Define windows:
val userWindow = Window.partitionBy("user_name").orderBy("login_date")
val userSessionWindow = Window.partitionBy("user_name", "session")
Find the points where new sessions starts:
找到新会话开始的点数:
val newSession = (coalesce(
datediff($"login_date", lag($"login_date", 1).over(userWindow)),
lit(0)
) > 5).cast("bigint")
val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
Find the earliest date per session:
查找每个会话的最早日期:
val result = sessionized
.withColumn("became_active", min($"login_date").over(userSessionWindow))
.drop("session")
With dataset defined as:
数据集定义为:
val df = Seq(
("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
("SirChillingtonIV", "2012-08-11")
).toDF("user_name", "login_date")
The result is:
结果是:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
|SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
|SirChillingtonIV|2012-01-14| 2012-01-11|
|SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
+----------------+----------+-------------+
#1
23
Here is the trick. Import a bunch of functions:
这是诀窍。导入一堆功能:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
Define windows:
val userWindow = Window.partitionBy("user_name").orderBy("login_date")
val userSessionWindow = Window.partitionBy("user_name", "session")
Find the points where new sessions starts:
找到新会话开始的点数:
val newSession = (coalesce(
datediff($"login_date", lag($"login_date", 1).over(userWindow)),
lit(0)
) > 5).cast("bigint")
val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
Find the earliest date per session:
查找每个会话的最早日期:
val result = sessionized
.withColumn("became_active", min($"login_date").over(userSessionWindow))
.drop("session")
With dataset defined as:
数据集定义为:
val df = Seq(
("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
("SirChillingtonIV", "2012-08-11")
).toDF("user_name", "login_date")
The result is:
结果是:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
|SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
|SirChillingtonIV|2012-01-14| 2012-01-11|
|SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
+----------------+----------+-------------+