Добавить сумму столбцов в качестве нового столбца в PySpark dataframe


Я использую PySpark, и у меня есть фрейм данных Spark с кучей числовых столбцов. Я хочу добавить столбец, который является суммой всех остальных столбцов.

Предположим, что в моем фрейме данных есть столбцы "a", "b"и " c". Я знаю, что могу это сделать:
df.withColumn('total_col', df.a + df.b + df.c)
Проблема в том, что я не хочу печатать каждый столбец по отдельности и добавлять их, особенно если у меня много столбцов. Я хочу иметь возможность сделать это автоматически или указав список имен столбцов, которые я хочу добавить. Есть другой способ сделать это?
2 16

2 ответа:

Это не было очевидно. Я вижу, что нет построчной суммой столбцов, определенных в СПАРК таблицы данных по API.

Версия 2

Это можно сделать довольно простым способом:

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

df.columns поставляется pyspark в виде списка строк, дающих все имена столбцов в фрейме данных Spark. Для другой суммы вместо этого можно указать любой другой список имен столбцов.

Я не стал пробовать это в качестве первого решения, потому что не был уверен, как оно будет себя вести. Но это завод.

Версия 1

Это слишком сложно, но тоже работает.

Вы можете сделать это:

  1. используйте df.columns, чтобы получить список имен столбцов
  2. используйте этот список имен, чтобы составить список столбцов
  3. передайте этот список чему-то, что вызовет перегруженную функцию add столбца функциональным способом типа fold

С помощью python'S reduce , некоторые знания о том, как работает перегрузка операторов, и PySpark код для столбцов здесь , который становится:

def column_add(a,b):
     return  a.__add__(b)

newdf = df.withColumn('total_col', 
         reduce(column_add, ( df[col] for col in df.columns ) ))

Обратите внимание, что это python reduce, а не spark RDD reduce, и термин скобки во втором параметре для уменьшения требует скобки, потому что это выражение генератора списков.

Проверено, Работает!

$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
...     return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]

Моя задача была аналогична вышеописанной (немного сложнее), поскольку я должен был добавить последовательные суммы столбцов в качестве новых столбцов в PySpark dataframe. Этот подход использует код из версии 1 Павла выше:

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate()
df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)\
                              ,(6,1,-4),(0,2,-2),(6,4,1)\
                              ,(4,5,2),(5,-3,-5),(6,4,-1)]\
                              ,schema=['x1','x2','x3'])
df.show()

+---+---+---+
| x1| x2| x3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  3|  2|  1|
|  6|  1| -4|
|  0|  2| -2|
|  6|  4|  1|
|  4|  5|  2|
|  5| -3| -5|
|  6|  4| -1|
+---+---+---+

colnames=df.columns

Добавьте новые столбцы, которые являются кумулятивными суммами (последовательными):

for i in range(0,len(colnames)):
    colnameLst= colnames[0:i+1]
    colname = 'cm'+ str(i+1)
    df = df.withColumn(colname, sum(df[col] for col in colnameLst))

ДФ.show ()

+---+---+---+---+---+---+
| x1| x2| x3|cm1|cm2|cm3|
+---+---+---+---+---+---+
|  1|  2|  3|  1|  3|  6|
|  4|  5|  6|  4|  9| 15|
|  3|  2|  1|  3|  5|  6|
|  6|  1| -4|  6|  7|  3|
|  0|  2| -2|  0|  2|  0|
|  6|  4|  1|  6| 10| 11|
|  4|  5|  2|  4|  9| 11|
|  5| -3| -5|  5|  2| -3|
|  6|  4| -1|  6| 10|  9|
+---+---+---+---+---+---+

Добавлены следующие столбцы "кумулятивная сумма":

cm1 = x1
cm2 = x1 + x2
cm3 = x1 + x2 + x3