Добавить сумму столбцов в качестве нового столбца в PySpark dataframe
Я использую PySpark, и у меня есть фрейм данных Spark с кучей числовых столбцов. Я хочу добавить столбец, который является суммой всех остальных столбцов.
Предположим, что в моем фрейме данных есть столбцы "a", "b"и " c". Я знаю, что могу это сделать:df.withColumn('total_col', df.a + df.b + df.c)
Проблема в том, что я не хочу печатать каждый столбец по отдельности и добавлять их, особенно если у меня много столбцов. Я хочу иметь возможность сделать это автоматически или указав список имен столбцов, которые я хочу добавить. Есть другой способ сделать это?2 ответа:
Это не было очевидно. Я вижу, что нет построчной суммой столбцов, определенных в СПАРК таблицы данных по API.
Версия 2
Это можно сделать довольно простым способом:
newdf = df.withColumn('total', sum(df[col] for col in df.columns))
Я не стал пробовать это в качестве первого решения, потому что не был уверен, как оно будет себя вести. Но это завод.
df.columns
поставляется pyspark в виде списка строк, дающих все имена столбцов в фрейме данных Spark. Для другой суммы вместо этого можно указать любой другой список имен столбцов.Версия 1
Это слишком сложно, но тоже работает.Вы можете сделать это:
- используйте
df.columns
, чтобы получить список имен столбцов- используйте этот список имен, чтобы составить список столбцов
- передайте этот список чему-то, что вызовет перегруженную функцию add столбца функциональным способом типа fold
С помощью python'S reduce , некоторые знания о том, как работает перегрузка операторов, и PySpark код для столбцов здесь , который становится:
def column_add(a,b): return a.__add__(b) newdf = df.withColumn('total_col', reduce(column_add, ( df[col] for col in df.columns ) ))
Обратите внимание, что это python reduce, а не spark RDD reduce, и термин скобки во втором параметре для уменьшения требует скобки, потому что это выражение генератора списков.
Проверено, Работает!
$ pyspark >>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache() >>> df DataFrame[a: bigint, b: bigint, c: bigint] >>> df.columns ['a', 'b', 'c'] >>> def column_add(a,b): ... return a.__add__(b) ... >>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect() [Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]
Моя задача была аналогична вышеописанной (немного сложнее), поскольку я должен был добавить последовательные суммы столбцов в качестве новых столбцов в PySpark dataframe. Этот подход использует код из версии 1 Павла выше:
import pyspark from pyspark.sql import SparkSession import pandas as pd spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate() df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)\ ,(6,1,-4),(0,2,-2),(6,4,1)\ ,(4,5,2),(5,-3,-5),(6,4,-1)]\ ,schema=['x1','x2','x3']) df.show() +---+---+---+ | x1| x2| x3| +---+---+---+ | 1| 2| 3| | 4| 5| 6| | 3| 2| 1| | 6| 1| -4| | 0| 2| -2| | 6| 4| 1| | 4| 5| 2| | 5| -3| -5| | 6| 4| -1| +---+---+---+ colnames=df.columns
Добавьте новые столбцы, которые являются кумулятивными суммами (последовательными):
for i in range(0,len(colnames)): colnameLst= colnames[0:i+1] colname = 'cm'+ str(i+1) df = df.withColumn(colname, sum(df[col] for col in colnameLst))
ДФ.show ()
+---+---+---+---+---+---+ | x1| x2| x3|cm1|cm2|cm3| +---+---+---+---+---+---+ | 1| 2| 3| 1| 3| 6| | 4| 5| 6| 4| 9| 15| | 3| 2| 1| 3| 5| 6| | 6| 1| -4| 6| 7| 3| | 0| 2| -2| 0| 2| 0| | 6| 4| 1| 6| 10| 11| | 4| 5| 2| 4| 9| 11| | 5| -3| -5| 5| 2| -3| | 6| 4| -1| 6| 10| 9| +---+---+---+---+---+---+
Добавлены следующие столбцы "кумулятивная сумма":
cm1 = x1 cm2 = x1 + x2 cm3 = x1 + x2 + x3