PySpark-потеря строковых значений при создании пар значений ключей
Мне нужно создать пары значений ключей для каждой строки в фрейме данных / RDD. То есть каждый человек станет ключом для каждой строки, а связанная с ними транзакция-это список, который становится значением.
У меня есть следующий пример, чтобы проиллюстрировать мою проблему:
a = [
('Bob', 562,"Food", "12 May 2018"),
('Bob',880,"Food","01 June 2018"),
('Bob',380,'Household'," 16 June 2018"),
('Sue',85,'Household'," 16 July 2018"),
('Sue',963,'Household'," 16 Sept 2018")
]
df = spark.createDataFrame(a, ["Person", "Amount","Budget", "Date"])
Затем я создаю функцию для выполнения пары значений ключей для каждой строки
def make_keys_and_value(row):
""" Convert the dataframe rows into key value pairs
"""
return (row["Person"], [[row["Amount"], row["Budget"],
row["Date"]]])
person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row))
Однако, когда я хочу показать результаты, Budget и Date становятся нулями. Я думаю, это связано с тем, что они струнные ценности.
person_summarries_rdd.toDF().show(5,False)
+---+-------------------------------+
|_1 |_2 |
+---+-------------------------------+
|Bob|[WrappedArray(562, null, null)]|
|Bob|[WrappedArray(880, null, null)]|
|Bob|[WrappedArray(380, null, null)]|
|Sue|[WrappedArray(85, null, null)] |
|Sue|[WrappedArray(963, null, null)]|
+---+-------------------------------+
Мне нужно сохранить значения строк, все еще используя этот метод.
1 ответ:
Нет необходимости сериализоваться в
rdd. Вы можете использоватьpyspark.sql.functions.struct():import pyspark.sql.function as f df.withColumn('values', f.struct(f.col('Amount'), f.col('Budget'), f.col('Date')))\ .select('Person', 'values').show(truncate=False) #+------+-----------------------------+ #|Person|values | #+------+-----------------------------+ #|Bob |[562,Food,12 May 2018] | #|Bob |[880,Food,01 June 2018] | #|Bob |[380,Household, 16 June 2018]| #|Sue |[85,Household, 16 July 2018] | #|Sue |[963,Household, 16 Sept 2018]| #+------+-----------------------------+Или используя понимание списка:
array_columns = [c for c in df.columns if c != 'Person'] df.withColumn('values', f.struct(*[f.col(c) for c in array_columns]))\ .select('Person', 'values').show(truncate=False) #+------+-----------------------------+ #|Person|values | #+------+-----------------------------+ #|Bob |[562,Food,12 May 2018] | #|Bob |[880,Food,01 June 2018] | #|Bob |[380,Household, 16 June 2018]| #|Sue |[85,Household, 16 July 2018] | #|Sue |[963,Household, 16 Sept 2018]| #+------+-----------------------------+
Ваш код не работает, потому что вы не можете иметь смешанные типы в
WrappedArray(). Spark выводит тип из первого элемента (Amount).Вы можете либо привести
Amountкstr:def make_keys_and_value(row): """ Convert the dataframe rows into key value pairs """ return (row["Person"], [[str(row["Amount"]), row["Budget"], row["Date"]]]) person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row)) person_summarries_rdd.toDF().show(truncate=False) #+---+---------------------------------------------+ #|_1 |_2 | #+---+---------------------------------------------+ #|Bob|[WrappedArray(562, Food, 12 May 2018)] | #|Bob|[WrappedArray(880, Food, 01 June 2018)] | #|Bob|[WrappedArray(380, Household, 16 June 2018)]| #|Sue|[WrappedArray(85, Household, 16 July 2018)] | #|Sue|[WrappedArray(963, Household, 16 Sept 2018)]| #+---+---------------------------------------------+Или используйте
tupleвместоlist:def make_keys_and_value(row): """ Convert the dataframe rows into key value pairs """ return (row["Person"], ((row["Amount"]), row["Budget"], row["Date"])) person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row)) #+---+-----------------------------+ #|_1 |_2 | #+---+-----------------------------+ #|Bob|[562,Food,12 May 2018] | #|Bob|[880,Food,01 June 2018] | #|Bob|[380,Household, 16 June 2018]| #|Sue|[85,Household, 16 July 2018] | #|Sue|[963,Household, 16 Sept 2018]| #+---+-----------------------------+Здесь я вынул вложенный
[], но вы можете легко добавить его обратно, если предпочитаете вывод должен выглядеть как[[562,Food,12 May 2018]]вместо[562,Food,12 May 2018].
Другой вариант-создать карту с помощью
pyspark.sql.functions.create_map():df.withColumn( 'values', f.create_map( *reduce( list.__add__, [[f.lit(c), f.col(c)] for c in array_columns] ) ) ).select('Person', 'values').show(truncate=False) #+------+--------------------------------------------------------------+ #|Person|values | #+------+--------------------------------------------------------------+ #|Bob |Map(Amount -> 562, Budget -> Food, Date -> 12 May 2018) | #|Bob |Map(Amount -> 880, Budget -> Food, Date -> 01 June 2018) | #|Bob |Map(Amount -> 380, Budget -> Household, Date -> 16 June 2018)| #|Sue |Map(Amount -> 85, Budget -> Household, Date -> 16 July 2018) | #|Sue |Map(Amount -> 963, Budget -> Household, Date -> 16 Sept 2018)| #+------+--------------------------------------------------------------+Или, возможно, вы хотите перейти непосредственно к отображению
Person->array:df.withColumn('values', f.struct(*[f.col(c) for c in array_columns]))\ .withColumn('map',f.create_map(f.col('Person'), f.col('values')))\ .select('map')\ .show(truncate=False) #+-----------------------------------------+ #|map | #+-----------------------------------------+ #|Map(Bob -> [562,Food,12 May 2018]) | #|Map(Bob -> [880,Food,01 June 2018]) | #|Map(Bob -> [380,Household, 16 June 2018])| #|Map(Sue -> [85,Household, 16 July 2018]) | #|Map(Sue -> [963,Household, 16 Sept 2018])| #+-----------------------------------------+