Pyspark: Лучшая практика, чтобы добавить дополнительные столбцы к таблице данных
СПАРК таблицы данных имеет Способ withColumn
, чтобы добавить новый столбец одновременно. Для добавления нескольких столбцов требуется цепочка withColumn
s. Является ли это лучшей практикой для этого?
mapPartitions
имеет больше преимуществ. Допустим, у меня есть цепочка из трех withColumn
s, а затем один фильтр для удаления Row
s, основанный на определенных условиях. Это четыре различные операции (хотя я не уверен, что какая-либо из них является широкими преобразованиями). Но я могу сделать все это за один раз, если я сделаю mapPartitions
. Это также помогает, если у меня есть подключение к базе данных, которое я предпочел бы открыть один раз на раздел RDD.
Мой вопрос состоит из двух частей.
Первая часть, это моя реализация mapPartitions. Есть ли какие-либо непредвиденные проблемы с этим подходом? И есть ли более элегантный способ сделать это?df2 = df.rdd.mapPartitions(add_new_cols).toDF()
def add_new_cols(rows):
db = open_db_connection()
new_rows = []
new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
i = 0
for each_row in rows:
i += 1
# conditionally omit rows
if i % 3 == 0:
continue
db_result = db.get_some_result(each_row.existing_col_2)
new_col_1 = ''.join([db_result, "_NEW"])
new_col_2 = db_result
new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
new_rows.append(new_f_row)
db.close()
return iter(new_rows)
Вторая часть, каковы компромиссы в использовании mapPartitions
по цепочке withColumn
и filter
?
Я где-то читал, что использование доступных методов с Spark DFs всегда лучше, чем разворачивать свою собственную реализацию. Пожалуйста, дайте мне знать, если мои аргументы неверны. Спасибо! Все мысли приветствуются.
2 ответа:
Есть ли какие-либо непредвиденные проблемы с этим подходом?
Несколько. Наиболее серьезные последствия заключаются в следующем:
- в несколько раз больший объем памяти по сравнению с обычным
Высокая стоимость сериализации и десериализации, необходимых для перемещения данных между контекстами выполнения.DataFrame
кодом и значительные затраты на сборку мусора.- введение критической точки в планировщик запросов.
- как есть, стоимость вывода схемы на вызов
toDF
(может следует избегать, если предусмотрена правильная схема) и возможное повторное выполнение всех предыдущих шагов.- и так далее...
Некоторых из них можно избежать с помощью
udf
иselect
/withColumn
, другие не могут.Предположим, у меня есть цепочка из трех withColumns, а затем один фильтр для удаления строк на основе определенных условий. Это четыре различные операции (хотя я не уверен, что какая-либо из них является широкими преобразованиями). Но я могу сделать все это за один раз, если я сделаю mapPartitionsВаш
mapPartitions
не удаляет никаких операций и не обеспечивает никаких оптимизаций, которые Spark planner не может исключить. Его единственным преимуществом является то, что он обеспечивает хороший простор для дорогих объектов подключения.Я где-то читал, что использование доступных методов с Spark DFs всегда лучше, чем развертывание собственной реализации
Когда вы начинаете использовать исполнительскую логику Python, вы уже расходитесь с Spark SQL. Не важно, если вы используете
udf
,RDD
или недавно добавленный векторизованный udf. В конце дня вы должны принять решение, основанное на общей структуре вашего кода - если это преимущественно Python логика, выполняемая непосредственно на данных, возможно, лучше придерживатьсяRDD
или полностью пропустить Spark.Если это всего лишь часть логики и не вызывает серьезных проблем с производительностью, не беспокойтесь об этом.