Spark Scala: скользящая средняя для нескольких столбцов


Ввод:

val customers = sc.parallelize(List(("Alice", "2016-05-01", 50.00,4),
                                ("Alice", "2016-05-03", 45.00,2),
                                ("Alice", "2016-05-04", 55.00,4),
                                ("Bob", "2016-05-01", 25.00,6),
                                ("Bob", "2016-05-04", 29.00,7),
                                ("Bob", "2016-05-06", 27.00,10))).
                           toDF("name", "date", "amountSpent","NumItems")

Процедура:

 // Import the window functions.
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions._

 // Create a window spec.
 val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)

В этом окне спецификации данные секционируются клиентом. Данные каждого клиента упорядочены по дате. Кроме того, рамка окна определяется как начинающаяся с -1 (одна строка перед текущей строкой) и заканчивающаяся на 1 (одна строка после текущей строки), в общей сложности 3 строки в скользящем окне. Задача состоит в том, чтобы взять оконное суммирование для списка столбцов. В данном случае это "amountSpent", "NumItems". Но ... проблема может иметь до сотни столбцов.

Ниже приводится решение для выполнения оконного суммирования для каждого столбца. Однако, Как выполнить суммирование более эффективно? потому что нам не нужно каждый раз находить строки скользящего окна для каждого столбца.

 // Calculate the sum of spent
 customers.withColumn("sumSpent",sum(customers("amountSpent")).over(wSpec1)).show()

  +-----+----------+-----------+--------+--------+
  | name|      date|amountSpent|NumItems|sumSpent|
  +-----+----------+-----------+--------+--------+
  |Alice|2016-05-01|       50.0|       4|    95.0|
  |Alice|2016-05-03|       45.0|       2|   150.0|
  |Alice|2016-05-04|       55.0|       4|   100.0|
  |  Bob|2016-05-01|       25.0|       6|    54.0|
  |  Bob|2016-05-04|       29.0|       7|    81.0|
  |  Bob|2016-05-06|       27.0|      10|    56.0|
  +-----+----------+-----------+--------+--------+

 // Calculate the sum of items
 customers.withColumn( "sumItems",
                sum(customers("NumItems")).over(wSpec1)  ).show()

  +-----+----------+-----------+--------+--------+
  | name|      date|amountSpent|NumItems|sumItems|
  +-----+----------+-----------+--------+--------+
  |Alice|2016-05-01|       50.0|       4|       6|
  |Alice|2016-05-03|       45.0|       2|      10|
  |Alice|2016-05-04|       55.0|       4|       6|
  |  Bob|2016-05-01|       25.0|       6|      13|
  |  Bob|2016-05-04|       29.0|       7|      23|
  |  Bob|2016-05-06|       27.0|      10|      17|
  +-----+----------+-----------+--------+--------+
2   2  

2 ответа:

В настоящее время, я думаю, невозможно обновить несколько столбцов с помощью функции окна. Вы можете действовать так, как будто это происходит в то же время, что и ниже

val customers = sc.parallelize(List(("Alice", "2016-05-01", 50.00,4),
  ("Alice", "2016-05-03", 45.00,2),
  ("Alice", "2016-05-04", 55.00,4),
  ("Bob", "2016-05-01", 25.00,6),
  ("Bob", "2016-05-04", 29.00,7),
  ("Bob", "2016-05-06", 27.00,10))).
  toDF("name", "date", "amountSpent","NumItems")

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

// Create a window spec.
val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
var tempdf = customers
val colNames = List("amountSpent", "NumItems")
for(column <- colNames){
  tempdf = tempdf.withColumn(column+"Sum", sum(tempdf(column)).over(wSpec1))
}
tempdf.show(false)

У вас должен быть вывод в виде

+-----+----------+-----------+--------+--------------+-----------+
|name |date      |amountSpent|NumItems|amountSpentSum|NumItemsSum|
+-----+----------+-----------+--------+--------------+-----------+
|Bob  |2016-05-01|25.0       |6       |54.0          |13         |
|Bob  |2016-05-04|29.0       |7       |81.0          |23         |
|Bob  |2016-05-06|27.0       |10      |56.0          |17         |
|Alice|2016-05-01|50.0       |4       |95.0          |6          |
|Alice|2016-05-03|45.0       |2       |150.0         |10         |
|Alice|2016-05-04|55.0       |4       |100.0         |6          |
+-----+----------+-----------+--------+--------------+-----------+

Да, можно вычислить окно только один раз (если у вас есть Spark 2, который позволяет использовать collect_list со struct-типами), предполагая, что у вас есть фрейм данных и windowSpec, как в вашем коде, то:

val colNames = List("amountSpent","NumItems")
val cols= colNames.map(col(_))

// put window-content of all columns in one struct
val df_wc_arr = customers
.withColumn("window_content_arr",collect_list(struct(cols:_*)).over(wSpec1))

// calculate sum of window-content for each column
// aggregation exression used later
val aggExpr = colNames.map(n => sum(col("window_content."+n)).as(n+"Sum"))

df_wc_arr
.withColumn("window_content",explode($"window_content_arr"))
.drop($"window_content_arr")
.groupBy(($"name" :: $"date" :: cols):_*)
.agg(aggExpr.head,aggExpr.tail:_*)
.orderBy($"name",$"date")
.show

Дает

+-----+----------+-----------+--------+--------------+-----------+
| name|      date|amountSpent|NumItems|amountSpentSum|NumItemsSum|
+-----+----------+-----------+--------+--------------+-----------+
|Alice|2016-05-01|       50.0|       4|          95.0|          6|
|Alice|2016-05-03|       45.0|       2|         150.0|         10|
|Alice|2016-05-04|       55.0|       4|         100.0|          6|
|  Bob|2016-05-01|       25.0|       6|          54.0|         13|
|  Bob|2016-05-04|       29.0|       7|          81.0|         23|
|  Bob|2016-05-06|       27.0|      10|          56.0|         17|
+-----+----------+-----------+--------+--------------+-----------+