PySpark-потеря строковых значений при создании пар значений ключей


Мне нужно создать пары значений ключей для каждой строки в фрейме данных / RDD. То есть каждый человек станет ключом для каждой строки, а связанная с ними транзакция-это список, который становится значением.

У меня есть следующий пример, чтобы проиллюстрировать мою проблему:

a = [
    ('Bob', 562,"Food", "12 May 2018"),
    ('Bob',880,"Food","01 June 2018"),
    ('Bob',380,'Household'," 16 June 2018"),
    ('Sue',85,'Household'," 16 July 2018"),
    ('Sue',963,'Household'," 16 Sept 2018")
] 
df = spark.createDataFrame(a, ["Person", "Amount","Budget", "Date"])

Затем я создаю функцию для выполнения пары значений ключей для каждой строки

def make_keys_and_value(row):
    """ Convert the dataframe rows into key value pairs

    """
    return (row["Person"], [[row["Amount"], row["Budget"],
                                 row["Date"]]])
person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row))
Однако, когда я хочу показать результаты, Budget и Date становятся нулями. Я думаю, это связано с тем, что они струнные ценности.
person_summarries_rdd.toDF().show(5,False)
+---+-------------------------------+
|_1 |_2                             |
+---+-------------------------------+
|Bob|[WrappedArray(562, null, null)]|
|Bob|[WrappedArray(880, null, null)]|
|Bob|[WrappedArray(380, null, null)]|
|Sue|[WrappedArray(85, null, null)] |
|Sue|[WrappedArray(963, null, null)]|
+---+-------------------------------+

Мне нужно сохранить значения строк, все еще используя этот метод.

1 2

1 ответ:

Нет необходимости сериализоваться в rdd. Вы можете использовать pyspark.sql.functions.struct():

import pyspark.sql.function as f
df.withColumn('values', f.struct(f.col('Amount'), f.col('Budget'), f.col('Date')))\
    .select('Person', 'values').show(truncate=False)
#+------+-----------------------------+
#|Person|values                       |
#+------+-----------------------------+
#|Bob   |[562,Food,12 May 2018]       |
#|Bob   |[880,Food,01 June 2018]      |
#|Bob   |[380,Household, 16 June 2018]|
#|Sue   |[85,Household, 16 July 2018] |
#|Sue   |[963,Household, 16 Sept 2018]|
#+------+-----------------------------+

Или используя понимание списка:

array_columns = [c for c in df.columns if c != 'Person']
df.withColumn('values', f.struct(*[f.col(c) for c in array_columns]))\
    .select('Person', 'values').show(truncate=False)
#+------+-----------------------------+
#|Person|values                       |
#+------+-----------------------------+
#|Bob   |[562,Food,12 May 2018]       |
#|Bob   |[880,Food,01 June 2018]      |
#|Bob   |[380,Household, 16 June 2018]|
#|Sue   |[85,Household, 16 July 2018] |
#|Sue   |[963,Household, 16 Sept 2018]|
#+------+-----------------------------+

Ваш код не работает, потому что вы не можете иметь смешанные типы в WrappedArray(). Spark выводит тип из первого элемента (Amount).

Вы можете либо привести Amount к str:

def make_keys_and_value(row):
    """ Convert the dataframe rows into key value pairs

    """
    return (row["Person"], [[str(row["Amount"]), row["Budget"],
                                 row["Date"]]])
person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row))
person_summarries_rdd.toDF().show(truncate=False)
#+---+---------------------------------------------+
#|_1 |_2                                           |
#+---+---------------------------------------------+
#|Bob|[WrappedArray(562, Food, 12 May 2018)]       |
#|Bob|[WrappedArray(880, Food, 01 June 2018)]      |
#|Bob|[WrappedArray(380, Household,  16 June 2018)]|
#|Sue|[WrappedArray(85, Household,  16 July 2018)] |
#|Sue|[WrappedArray(963, Household,  16 Sept 2018)]|
#+---+---------------------------------------------+

Или используйте tuple вместо list:

def make_keys_and_value(row):
    """ Convert the dataframe rows into key value pairs

    """
    return (row["Person"], ((row["Amount"]), row["Budget"],
                                 row["Date"]))
person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row))
#+---+-----------------------------+
#|_1 |_2                           |
#+---+-----------------------------+
#|Bob|[562,Food,12 May 2018]       |
#|Bob|[880,Food,01 June 2018]      |
#|Bob|[380,Household, 16 June 2018]|
#|Sue|[85,Household, 16 July 2018] |
#|Sue|[963,Household, 16 Sept 2018]|
#+---+-----------------------------+

Здесь я вынул вложенный [], но вы можете легко добавить его обратно, если предпочитаете вывод должен выглядеть как [[562,Food,12 May 2018]] вместо [562,Food,12 May 2018].


Другой вариант-создать карту с помощью pyspark.sql.functions.create_map():

df.withColumn(
    'values',
    f.create_map(
        *reduce(
            list.__add__,
            [[f.lit(c), f.col(c)] for c in array_columns]
        )
    )
).select('Person', 'values').show(truncate=False)
#+------+--------------------------------------------------------------+
#|Person|values                                                        |
#+------+--------------------------------------------------------------+
#|Bob   |Map(Amount -> 562, Budget -> Food, Date -> 12 May 2018)       |
#|Bob   |Map(Amount -> 880, Budget -> Food, Date -> 01 June 2018)      |
#|Bob   |Map(Amount -> 380, Budget -> Household, Date ->  16 June 2018)|
#|Sue   |Map(Amount -> 85, Budget -> Household, Date ->  16 July 2018) |
#|Sue   |Map(Amount -> 963, Budget -> Household, Date ->  16 Sept 2018)|
#+------+--------------------------------------------------------------+

Или, возможно, вы хотите перейти непосредственно к отображению Person->array:

df.withColumn('values', f.struct(*[f.col(c) for c in array_columns]))\
    .withColumn('map',f.create_map(f.col('Person'), f.col('values')))\
    .select('map')\
    .show(truncate=False)
#+-----------------------------------------+
#|map                                      |
#+-----------------------------------------+
#|Map(Bob -> [562,Food,12 May 2018])       |
#|Map(Bob -> [880,Food,01 June 2018])      |
#|Map(Bob -> [380,Household, 16 June 2018])|
#|Map(Sue -> [85,Household, 16 July 2018]) |
#|Map(Sue -> [963,Household, 16 Sept 2018])|
#+-----------------------------------------+