Достижение параллелизма при сохранении в секционированный файл parquet


При записи dataframe в parquet с помощью partitionBy :

df.write.partitionBy("col1","col2","col3").parquet(path)
Я ожидал бы, что каждая записываемая секция выполнялась бы независимо от отдельной задачи и параллельно в зависимости от количества работников, назначенных на текущую работу spark.

Однако на самом деле есть только один рабочий/задача выполняется в то время, когда запись на паркет. Что один работник циклически проходит через каждый из разделов и записывает файлы .parquet последовательно. Почему это должно быть так? case-и есть ли способ принудить параллелизм в этой операции spark.write.parquet?

Следующее-этоне то, что я хочу видеть (должно быть 700%+ ..)

Введите описание изображения здесь

Из этого другого поста я также попытался добавить repartition спереди

Spark parquet разметка: большое количество файлов

df.repartition("col1","col2","col3").write.partitionBy("col1","col2","col3").parquet(path)
К сожалению, это не возымело никакого эффекта: все еще только один рабочий..

Примечание: Я работаю в режиме local с local[8] и видел другие операции spark выполняются одновременно с восемью рабочими и используют до 750% процессоров.

1 2

1 ответ:

Короче говоря, написание нескольких выходных файлов из одной задачи не распараллеливается, но если у вас есть несколько задач (несколько входных разбиений), каждая из них получит свое собственное ядро на работнике.

Целью записи секционированных данных не является распараллеливание операции записи. Spark уже делает это, одновременно выписывая несколько задач одновременно. Цель состоит только в том, чтобы оптимизировать будущие операции чтения, где вы хотите только один раздел сохраненного данные.

Логика записи разделов в Spark предназначена для чтения всех записей с предыдущего этапа только один раз при записи их в пункт назначения. Я считаю, что частью выбора дизайна является также защита от случая, когда ключ раздела имеет много-много значений.

Правка: Искра 2.X метод

В Искре 2.x, он сортирует записи в каждой задаче по их ключам разделов, а затем перебирает их, записывая по одному файловому дескриптору за раз. Я предполагаю, что они делают это, чтобы гарантировать, что они никогда не откроют огромное количество дескрипторов файлов, если есть много различных значений в ваших ключах разделов.

Для справки, вот сортировка:

Https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121

Прокрутите немного вниз, и вы увидите, что он вызывает write(iter.next()) циклически через каждую строку.

И вот фактическое написание (один ключ файла / раздела одновременно):

Https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121

Там вы можете видеть, что он держит только один дескриптор файла открытым одновременно.

Правка: Искра 1.X метод

Какая Искра 1.x does is for a given task is loop through all the records, открывая новый дескриптор файла всякий раз, когда он встречает новый выходной раздел, который он не видел раньше для эта задача. Затем он немедленно записывает запись в этот файловый дескриптор и переходит к следующему. Это означает, что в любой момент времени при обработке одной задачи может быть открыто до N дескрипторов файлов только для этой задачи, где N-максимальное число выходных разделов. Чтобы было понятнее, вот некоторые Python psuedo-код, чтобы показать общую идею:

# To write out records in a single InputSplit/Task
handles = {}
for row in input_split:
    partition_path = determine_output_path(row, partition_keys)
    if partition_path not in handles:
        handles[partition_path] = open(partition_path, 'w')

    handles[partition_path].write(row)

В приведенной выше стратегии записи записей есть один нюанс. В Искре 1.x параметр spark.sql.sources.maxConcurrentWrites устанавливает верхний предел маски дескрипторы файлов, которые могут быть открыты для каждой задачи. После этого Spark будет сортировать данные по ключу раздела, чтобы можно было перебирать записи, записывая по одному файлу за раз.