СПАРК - передел() против объединиться()


по данным Learning Spark

имейте в виду, что перераспределение данных является довольно дорогостоящей операцией. Spark также имеет оптимизированную версию repartition() под названием coalesce (), которая позволяет избежать перемещения данных, но только если вы уменьшаете количество разделов RDD.

одно отличие, которое я получаю, заключается в том, что с переделом() количество разделов может быть увеличено/уменьшено, но с объединением () количество разделов может быть только уменьшившийся.

Если разделы распределены по нескольким машинам и выполняется coalesce (), как он может избежать перемещения данных?

7 126

7 ответов:

настоящее полное перемешать. Если известно, что число уменьшается, то исполнитель может безопасно хранить данные на минимальном количестве разделов, только перемещая данные с дополнительных узлов на узлы, которые мы сохранили.

Итак, это будет что-то вроде этого:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

затем coalesce до 2 разделов:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

обратите внимание, что узел 1 и узел 3 не требуют исходных данных для перемещения.

ответ Джастина является удивительным, и этот ответ идет в более глубокую.

The repartition алгоритм делает полную перетасовку и создает новые разделы с данными, которые распределены равномерно. Давайте создадим фрейм данных с числами от 1 до 12.

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf содержит 4 раздела на моей машине.

numbersDf.rdd.partitions.size // => 4

вот как данные делятся на разделы:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

давайте сделаем полную перетасовку с repartition метод и получить эти данные на двух узлах.

val numbersDfR = numbersDf.repartition(2)

вот как numbersDfR данные разделены на моей машине:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

The repartition метод создает новые разделы и равномерно распределяет данные в новых разделах (распределение данных более равномерное для больших наборов данных).

разницу между coalesce и repartition

coalesce использует существующие разделы для минимизации объема перетасованных данных. repartition создает новые разделы и делает полную перетасовку. coalesce результаты в разделах с различными объемами данных (иногда разделы, которые имеют очень разные размеры) и repartition приводит к примерно равному размеру разделов.

и coalesce или repartition быстрее?

coalesce может работать быстрее, чем repartition, но неравные по размеру разделы обычно работают медленнее, чем равные по размеру разделы. Как правило, после фильтрации наборов данных необходимо выполнить повторное разбиение большие наборы данных. Я нашел repartition чтобы быть быстрее в целом, потому что Spark построен для работы с одинаковыми размерами разделов.

прочитайте это сообщение в блоге если вы хотите еще более детально.

еще один момент, который следует отметить здесь, заключается в том, что основным принципом Spark RDD является неизменность. Передел или объединение создаст новый RDD. Базовый RDD будет продолжать существовать со своим исходным количеством разделов. В случае, если прецедент требует сохранения RDD в кэше, то то же самое должно быть сделано для вновь созданного RDD.

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2

все ответы добавляют некоторые большие знания в этот очень часто задаваемый вопрос.

Итак, следуя традиции временной шкалы этого вопроса, вот мои 2 цента.

нашел передел должен быть быстрее, чем коалесцировать в очень конкретном случае.

в моем приложении, когда количество файлов, которые мы оцениваем, ниже определенного порога, передел работает быстрее.

вот что я имею в виду

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

в приведенном выше фрагменте кода, если мои файлы были меньше 20, coalesce занимал вечность, чтобы закончить, в то время как передел был намного быстрее, и поэтому приведенный выше код.

конечно, эта цифра (20) будет зависеть от количества работников и объема данных.

надеюсь, что это поможет.

простым способом COALESCE : - это только для уменьшения количества разделов, без перетасовки данных он просто сжимает разделы

передел: - для увеличения и уменьшения нет разделов , но перетасовка происходит

пример:

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

Как работает

но мы обычно идем на эти две вещи, когда нам нужно увидеть вывод в одном кластере, мы идем с этим.

но также вы должны убедиться, что данные, которые приходят коалесцируют узлы должны иметь высокую конфигурацию, если вы имеете дело с огромными данными. Поскольку все данные будут загружены на эти узлы, может привести исключение памяти. Хотя возмещение стоит дорого, я предпочитаю использовать его. Так как он перемешивает и распределяет данные поровну.

будьте мудры, чтобы выбрать между объединением и переделом.

повторное разбиение-рекомендуется использовать повторное разбиение, не увеличивая ни одного из разделов, поскольку это связано с перетасовкой всех данных.

коалесцировать-порекомендованы, что использует коалесцирует пока уменьшающ никакое разделов. Например, если у вас есть 3 раздела и вы хотите уменьшить его до 2 разделов, Coalesce переместит данные 3-го раздела В разделы 1 и 2. Раздел 1 и 2 останется в том же контейнере.но передел будет перетасовывать данные во всех разделах, поэтому использование сети между исполнителем будет высокий и это влияет на производительность.

представление велемудрое коалесцирует представление более лучшее чем перераспределение пока уменьшающ никакое разделов.