Режим группирования данных в (py)Spark
У меня есть фрейм данных spark с несколькими столбцами. Я хотел бы сгруппировать строки на основе одного столбца, а затем найти режим второго столбца для каждой группы. Работая с фреймом данных pandas, я бы сделал что-то вроде этого:
rand_values = np.random.randint(max_value,
size=num_values).reshape((num_values/2, 2))
rand_values = pd.DataFrame(rand_values, columns=['x', 'y'])
rand_values['x'] = rand_values['x'] > max_value/2
rand_values['x'] = rand_values['x'].astype('int32')
print(rand_values)
## x y
## 0 0 0
## 1 0 4
## 2 0 1
## 3 1 1
## 4 1 2
def mode(series):
return scipy.stats.mode(series['y'])[0][0]
rand_values.groupby('x').apply(mode)
## x
## 0 4
## 1 1
## dtype: int64
В пределах pyspark я могу найти режим работы одного столбца
df = sql_context.createDataFrame(rand_values)
def mode_spark(df, column):
# Group by column and count the number of occurrences
# of each x value
counts = df.groupBy(column).count()
# - Find the maximum value in the 'counts' column
# - Join with the counts dataframe to select the row
# with the maximum count
# - Select the first element of this dataframe and
# take the value in column
mode = counts.join(
counts.agg(F.max('count').alias('count')),
on='count'
).limit(1).select(column)
return mode.first()[column]
mode_spark(df, 'x')
## 1
mode_spark(df, 'y')
## 1
Я не знаю, как применить эту функцию к сгруппированным данным. Если невозможно напрямую применить эту логику к фрейму данных, то можно ли достичь тот же эффект каким-то другим способом?
Заранее благодарю вас!
1 ответ:
Решение, предложенное zero323.
Оригинальное решение: https://stackoverflow.com/a/35226857/1560062
Во-первых, подсчитайте случаи возникновения каждой комбинации (x, y).counts = df.groupBy(['x', 'y']).count().alias('counts') counts.show() ## +---+---+-----+ ## | x| y|count| ## +---+---+-----+ ## | 0| 1| 2| ## | 0| 3| 2| ## | 0| 4| 2| ## | 1| 1| 3| ## | 1| 3| 1| ## +---+---+-----+
Решение 1: сгруппировать по "x", агрегировать, взяв максимальное значение отсчетов в каждой группе. Наконец, отбросьте колонку "количество".
result = (counts .groupBy('x') .agg(F.max(F.struct(F.col('count'), F.col('y'))).alias('max')) .select(F.col('x'), F.col('max.y')) ) result.show() ## +---+---+ ## | x| y| ## +---+---+ ## | 0| 4| ## | 1| 1| ## +---+---+
Решение 2: Использование окна, разбиение по " x "и порядок по столбцу "count". Теперь выберите первый ряд в каждом из перекрытия.
win = Window().partitionBy('x').orderBy(F.col('count').desc()) result = (counts .withColumn('row_num', F.rowNumber().over(win)) .where(F.col('row_num') == 1) .select('x', 'y') ) result.show() ## +---+---+ ## | x| y| ## +---+---+ ## | 0| 1| ## | 1| 1| ## +---+---+
Два результата имеют различный результат из-за способа сортировки строк. Если связей нет, то оба метода дают один и тот же результат.