Pyspark: Лучшая практика, чтобы добавить дополнительные столбцы к таблице данных


СПАРК таблицы данных имеет Способ withColumn, чтобы добавить новый столбец одновременно. Для добавления нескольких столбцов требуется цепочка withColumn s. Является ли это лучшей практикой для этого?

Я чувствую, что использование mapPartitions имеет больше преимуществ. Допустим, у меня есть цепочка из трех withColumn s, а затем один фильтр для удаления Row s, основанный на определенных условиях. Это четыре различные операции (хотя я не уверен, что какая-либо из них является широкими преобразованиями). Но я могу сделать все это за один раз, если я сделаю mapPartitions. Это также помогает, если у меня есть подключение к базе данных, которое я предпочел бы открыть один раз на раздел RDD.

Мой вопрос состоит из двух частей.

Первая часть, это моя реализация mapPartitions. Есть ли какие-либо непредвиденные проблемы с этим подходом? И есть ли более элегантный способ сделать это?
df2 = df.rdd.mapPartitions(add_new_cols).toDF()

def add_new_cols(rows):
    db = open_db_connection()
    new_rows = []
    new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
    i = 0
    for each_row in rows:
        i += 1
        # conditionally omit rows
        if i % 3 == 0:
            continue
        db_result = db.get_some_result(each_row.existing_col_2)
        new_col_1 = ''.join([db_result, "_NEW"])
        new_col_2 = db_result
        new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
        new_rows.append(new_f_row)

    db.close()
    return iter(new_rows)

Вторая часть, каковы компромиссы в использовании mapPartitions по цепочке withColumn и filter?

Я где-то читал, что использование доступных методов с Spark DFs всегда лучше, чем разворачивать свою собственную реализацию. Пожалуйста, дайте мне знать, если мои аргументы неверны. Спасибо! Все мысли приветствуются.

2 3

2 ответа:

Есть ли какие-либо непредвиденные проблемы с этим подходом?

Несколько. Наиболее серьезные последствия заключаются в следующем:

  • в несколько раз больший объем памяти по сравнению с обычным DataFrame кодом и значительные затраты на сборку мусора.
  • Высокая стоимость сериализации и десериализации, необходимых для перемещения данных между контекстами выполнения.
  • введение критической точки в планировщик запросов.
  • как есть, стоимость вывода схемы на вызов toDF (может следует избегать, если предусмотрена правильная схема) и возможное повторное выполнение всех предыдущих шагов.
  • и так далее...

Некоторых из них можно избежать с помощью udf и select / withColumn, другие не могут.

Предположим, у меня есть цепочка из трех withColumns, а затем один фильтр для удаления строк на основе определенных условий. Это четыре различные операции (хотя я не уверен, что какая-либо из них является широкими преобразованиями). Но я могу сделать все это за один раз, если я сделаю mapPartitions

Ваш mapPartitions не удаляет никаких операций и не обеспечивает никаких оптимизаций, которые Spark planner не может исключить. Его единственным преимуществом является то, что он обеспечивает хороший простор для дорогих объектов подключения.

Я где-то читал, что использование доступных методов с Spark DFs всегда лучше, чем развертывание собственной реализации

Когда вы начинаете использовать исполнительскую логику Python, вы уже расходитесь с Spark SQL. Не важно, если вы используете udf, RDD или недавно добавленный векторизованный udf. В конце дня вы должны принять решение, основанное на общей структуре вашего кода - если это преимущественно Python логика, выполняемая непосредственно на данных, возможно, лучше придерживаться RDD или полностью пропустить Spark.

Если это всего лишь часть логики и не вызывает серьезных проблем с производительностью, не беспокойтесь об этом.

Использование df.withColumn() является лучшим способом добавления столбцов. они все добавлены лениво