Функция Spark SQL window со сложным условием
Это, вероятно, легче всего объяснить на примере. Предположим, у меня есть фрейм данных логинов пользователей на веб-сайте, например:
scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
Я хотел бы добавить к этому колонку, указывающую, когда они стали активными пользователями на сайте. Но есть один нюанс: существует период времени, в течение которого пользователь считается активным, и после этого периода, если он снова войдет в систему, его дата became_active
сбрасывается. Предположим, что этот период равен 5 дням . Тогда искомая таблица выводится из вышеизложенного таблица будет выглядеть примерно так:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
Так, в частности, дата SirChillingtonIV became_active
была сброшена, потому что их второй логин пришел после истечения активного периода, но дата Booooooo99900098 became_active
не была сброшена во второй раз, когда он/она вошел в систему, потому что она попала в активный период.
Моей первоначальной мыслью было использовать оконные функции с lag
, а затем использовать значения lag
ged для заполнения столбца became_active
; например, что-то начинающееся примерно например:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
Тогда правилом для заполнения даты became_active
будет, если tmp
является null
(то есть, если это первый вход в систему) или если login_date - tmp >= 5
, то became_active = login_date
; в противном случае перейдите к следующему самому последнему значению в tmp
и примените то же правило. Это предполагает рекурсивный подход, который я с трудом представляю себе, как его реализовать.
Мои вопросы: Является ли это жизнеспособным подходом, и если да, то как я могу "вернуться" и посмотреть на более ранние значения tmp
, пока не найду то, где я остановлюсь? Я не могу, чтобы мои знания, перебирайте значения Spark SQL Column
. Есть ли другой способ достичь этого результата?
1 ответ:
Вот в чем фокус. Импортируйте кучу функций:
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
Определите окна:
val userWindow = Window.partitionBy("user_name").orderBy("login_date") val userSessionWindow = Window.partitionBy("user_name", "session")
Найдите точки, где начинаются новые сеансы:
val newSession = (coalesce( datediff($"login_date", lag($"login_date", 1).over(userWindow)), lit(0) ) > 5).cast("bigint") val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
Найти самую раннюю дату за сеанс:
val result = sessionized .withColumn("became_active", min($"login_date").over(userSessionWindow)) .drop("session")
С набором данных, определенным как:
val df = Seq( ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"), ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"), ("SirChillingtonIV", "2012-08-11") ).toDF("user_name", "login_date")
Результат таков:
+----------------+----------+-------------+ | user_name|login_date|became_active| +----------------+----------+-------------+ | OprahWinfreyJr|2012-01-10| 2012-01-10| |SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user |SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user |SirChillingtonIV|2012-01-14| 2012-01-11| |SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user |Booooooo99900098|2012-01-04| 2012-01-04| |Booooooo99900098|2012-01-06| 2012-01-04| +----------------+----------+-------------+