Pyspark: Лучшая практика, чтобы добавить дополнительные столбцы к таблице данных
СПАРК таблицы данных имеет Способ withColumn, чтобы добавить новый столбец одновременно. Для добавления нескольких столбцов требуется цепочка withColumn s. Является ли это лучшей практикой для этого?
mapPartitions имеет больше преимуществ. Допустим, у меня есть цепочка из трех withColumn s, а затем один фильтр для удаления Row s, основанный на определенных условиях. Это четыре различные операции (хотя я не уверен, что какая-либо из них является широкими преобразованиями). Но я могу сделать все это за один раз, если я сделаю mapPartitions. Это также помогает, если у меня есть подключение к базе данных, которое я предпочел бы открыть один раз на раздел RDD.
Мой вопрос состоит из двух частей.
Первая часть, это моя реализация mapPartitions. Есть ли какие-либо непредвиденные проблемы с этим подходом? И есть ли более элегантный способ сделать это?df2 = df.rdd.mapPartitions(add_new_cols).toDF()
def add_new_cols(rows):
db = open_db_connection()
new_rows = []
new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
i = 0
for each_row in rows:
i += 1
# conditionally omit rows
if i % 3 == 0:
continue
db_result = db.get_some_result(each_row.existing_col_2)
new_col_1 = ''.join([db_result, "_NEW"])
new_col_2 = db_result
new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
new_rows.append(new_f_row)
db.close()
return iter(new_rows)
Вторая часть, каковы компромиссы в использовании mapPartitions по цепочке withColumn и filter?
Я где-то читал, что использование доступных методов с Spark DFs всегда лучше, чем разворачивать свою собственную реализацию. Пожалуйста, дайте мне знать, если мои аргументы неверны. Спасибо! Все мысли приветствуются.
2 ответа:
Есть ли какие-либо непредвиденные проблемы с этим подходом?
Несколько. Наиболее серьезные последствия заключаются в следующем:
- в несколько раз больший объем памяти по сравнению с обычным
Высокая стоимость сериализации и десериализации, необходимых для перемещения данных между контекстами выполнения.DataFrameкодом и значительные затраты на сборку мусора.- введение критической точки в планировщик запросов.
- как есть, стоимость вывода схемы на вызов
toDF(может следует избегать, если предусмотрена правильная схема) и возможное повторное выполнение всех предыдущих шагов.- и так далее...
Некоторых из них можно избежать с помощью
udfиselect/withColumn, другие не могут.Предположим, у меня есть цепочка из трех withColumns, а затем один фильтр для удаления строк на основе определенных условий. Это четыре различные операции (хотя я не уверен, что какая-либо из них является широкими преобразованиями). Но я могу сделать все это за один раз, если я сделаю mapPartitionsВаш
mapPartitionsне удаляет никаких операций и не обеспечивает никаких оптимизаций, которые Spark planner не может исключить. Его единственным преимуществом является то, что он обеспечивает хороший простор для дорогих объектов подключения.Я где-то читал, что использование доступных методов с Spark DFs всегда лучше, чем развертывание собственной реализации
Когда вы начинаете использовать исполнительскую логику Python, вы уже расходитесь с Spark SQL. Не важно, если вы используете
udf,RDDили недавно добавленный векторизованный udf. В конце дня вы должны принять решение, основанное на общей структуре вашего кода - если это преимущественно Python логика, выполняемая непосредственно на данных, возможно, лучше придерживатьсяRDDили полностью пропустить Spark.Если это всего лишь часть логики и не вызывает серьезных проблем с производительностью, не беспокойтесь об этом.