PySpark-потеря строковых значений при создании пар значений ключей
Мне нужно создать пары значений ключей для каждой строки в фрейме данных / RDD. То есть каждый человек станет ключом для каждой строки, а связанная с ними транзакция-это список, который становится значением.
У меня есть следующий пример, чтобы проиллюстрировать мою проблему:
a = [
('Bob', 562,"Food", "12 May 2018"),
('Bob',880,"Food","01 June 2018"),
('Bob',380,'Household'," 16 June 2018"),
('Sue',85,'Household'," 16 July 2018"),
('Sue',963,'Household'," 16 Sept 2018")
]
df = spark.createDataFrame(a, ["Person", "Amount","Budget", "Date"])
Затем я создаю функцию для выполнения пары значений ключей для каждой строки
def make_keys_and_value(row):
""" Convert the dataframe rows into key value pairs
"""
return (row["Person"], [[row["Amount"], row["Budget"],
row["Date"]]])
person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row))
Однако, когда я хочу показать результаты, Budget
и Date
становятся нулями. Я думаю, это связано с тем, что они струнные ценности.
person_summarries_rdd.toDF().show(5,False)
+---+-------------------------------+
|_1 |_2 |
+---+-------------------------------+
|Bob|[WrappedArray(562, null, null)]|
|Bob|[WrappedArray(880, null, null)]|
|Bob|[WrappedArray(380, null, null)]|
|Sue|[WrappedArray(85, null, null)] |
|Sue|[WrappedArray(963, null, null)]|
+---+-------------------------------+
Мне нужно сохранить значения строк, все еще используя этот метод.
1 ответ:
Нет необходимости сериализоваться в
rdd
. Вы можете использоватьpyspark.sql.functions.struct()
:import pyspark.sql.function as f df.withColumn('values', f.struct(f.col('Amount'), f.col('Budget'), f.col('Date')))\ .select('Person', 'values').show(truncate=False) #+------+-----------------------------+ #|Person|values | #+------+-----------------------------+ #|Bob |[562,Food,12 May 2018] | #|Bob |[880,Food,01 June 2018] | #|Bob |[380,Household, 16 June 2018]| #|Sue |[85,Household, 16 July 2018] | #|Sue |[963,Household, 16 Sept 2018]| #+------+-----------------------------+
Или используя понимание списка:
array_columns = [c for c in df.columns if c != 'Person'] df.withColumn('values', f.struct(*[f.col(c) for c in array_columns]))\ .select('Person', 'values').show(truncate=False) #+------+-----------------------------+ #|Person|values | #+------+-----------------------------+ #|Bob |[562,Food,12 May 2018] | #|Bob |[880,Food,01 June 2018] | #|Bob |[380,Household, 16 June 2018]| #|Sue |[85,Household, 16 July 2018] | #|Sue |[963,Household, 16 Sept 2018]| #+------+-----------------------------+
Ваш код не работает, потому что вы не можете иметь смешанные типы в
WrappedArray()
. Spark выводит тип из первого элемента (Amount
).Вы можете либо привести
Amount
кstr
:def make_keys_and_value(row): """ Convert the dataframe rows into key value pairs """ return (row["Person"], [[str(row["Amount"]), row["Budget"], row["Date"]]]) person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row)) person_summarries_rdd.toDF().show(truncate=False) #+---+---------------------------------------------+ #|_1 |_2 | #+---+---------------------------------------------+ #|Bob|[WrappedArray(562, Food, 12 May 2018)] | #|Bob|[WrappedArray(880, Food, 01 June 2018)] | #|Bob|[WrappedArray(380, Household, 16 June 2018)]| #|Sue|[WrappedArray(85, Household, 16 July 2018)] | #|Sue|[WrappedArray(963, Household, 16 Sept 2018)]| #+---+---------------------------------------------+
Или используйте
tuple
вместоlist
:def make_keys_and_value(row): """ Convert the dataframe rows into key value pairs """ return (row["Person"], ((row["Amount"]), row["Budget"], row["Date"])) person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row)) #+---+-----------------------------+ #|_1 |_2 | #+---+-----------------------------+ #|Bob|[562,Food,12 May 2018] | #|Bob|[880,Food,01 June 2018] | #|Bob|[380,Household, 16 June 2018]| #|Sue|[85,Household, 16 July 2018] | #|Sue|[963,Household, 16 Sept 2018]| #+---+-----------------------------+
Здесь я вынул вложенный
[]
, но вы можете легко добавить его обратно, если предпочитаете вывод должен выглядеть как[[562,Food,12 May 2018]]
вместо[562,Food,12 May 2018]
.
Другой вариант-создать карту с помощью
pyspark.sql.functions.create_map()
:df.withColumn( 'values', f.create_map( *reduce( list.__add__, [[f.lit(c), f.col(c)] for c in array_columns] ) ) ).select('Person', 'values').show(truncate=False) #+------+--------------------------------------------------------------+ #|Person|values | #+------+--------------------------------------------------------------+ #|Bob |Map(Amount -> 562, Budget -> Food, Date -> 12 May 2018) | #|Bob |Map(Amount -> 880, Budget -> Food, Date -> 01 June 2018) | #|Bob |Map(Amount -> 380, Budget -> Household, Date -> 16 June 2018)| #|Sue |Map(Amount -> 85, Budget -> Household, Date -> 16 July 2018) | #|Sue |Map(Amount -> 963, Budget -> Household, Date -> 16 Sept 2018)| #+------+--------------------------------------------------------------+
Или, возможно, вы хотите перейти непосредственно к отображению
Person->array
:df.withColumn('values', f.struct(*[f.col(c) for c in array_columns]))\ .withColumn('map',f.create_map(f.col('Person'), f.col('values')))\ .select('map')\ .show(truncate=False) #+-----------------------------------------+ #|map | #+-----------------------------------------+ #|Map(Bob -> [562,Food,12 May 2018]) | #|Map(Bob -> [880,Food,01 June 2018]) | #|Map(Bob -> [380,Household, 16 June 2018])| #|Map(Sue -> [85,Household, 16 July 2018]) | #|Map(Sue -> [963,Household, 16 Sept 2018])| #+-----------------------------------------+