как сделать saveAsTextFile не разделить вывод на несколько файлов?
при использовании Scala в Spark, всякий раз, когда я сбрасываю результаты с помощью saveAsTextFile
, Кажется, разделить выход на несколько частей. Я просто передаю ему параметр (путь).
val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
- соответствует ли количество выходов количеству редукторов, которые он использует?
- означает ли это, что выход сжат?
- Я знаю, что могу объединить вывод вместе с помощью bash, но есть ли возможность хранить вывод в одном текстовом файле, без раскол?? Я посмотрел на документы API, но это мало что говорит об этом.
9 ответов:
причина, по которой он сохраняет его как несколько файлов, заключается в том, что вычисление распределено. Если выход достаточно мал, так что вы думаете, что вы можете поместить его на одной машине, то вы можете закончить свою программу с
val arr = year.collect()
а затем сохраните полученный массив в виде файла, другой способ-использовать пользовательский разделитель,
partitionBy
, и сделать так, чтобы все идет в один раздел, хотя это не рекомендуется, потому что вы не получите никакого распараллеливания.если вы требуете, чтобы файл был сохранен с
saveAsTextFile
можно использоватьcoalesce(1,true).saveAsTextFile()
. Это в основном означает, что вычисление затем объединяется в 1 раздел. Вы также можете использоватьrepartition(1)
это просто обертка дляcoalesce
С аргументом shuffle, установленным в true. Просматривая источник RDD.скала как я понял большую часть этого материала, вы должны взглянуть.
можно назвать
coalesce(1)
а тоsaveAsTextFile()
- но это может быть плохой идеей, если у вас много данных. Отдельные файлы на разделение генерируются так же, как и в Hadoop, чтобы отдельные картографы и редукторы записывались в разные файлы. Наличие одного выходного файла - это только хорошая идея, если у вас очень мало данных, и в этом случае вы можете также собрать (), как сказал @aaronman.
для тех, кто работает с большой набор:
rdd.collect()
не следует использовать в этом случае, как это будет собрать все данные какArray
в драйвере, который является самым простым способом, чтобы выйти из памяти.
rdd.coalesce(1).saveAsTextFile()
также не следует использовать, так как параллелизм вышестоящих этапов будет потерян для выполнения на одном узле, где будут храниться данные от.
rdd.coalesce(1, shuffle = true).saveAsTextFile()
это лучший простой вариант как это будет держать обработку восходящих задач параллельно, а затем выполнять только перетасовку на один узел (rdd.repartition(1).saveAsTextFile()
- это точный синоним).
rdd.saveAsSingleTextFile()
как предусмотрено ниже дополнительно позволяет хранить rdd в одном файле С определенным именем сохраняя при этом свойства параллелизмаrdd.coalesce(1, shuffle = true).saveAsTextFile()
.что может быть неудобно с
rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt")
это то, что он на самом деле производит файл, путь которогоpath/to/file.txt/part-00000
, а неpath/to/file.txt
.следующее решение
rdd.saveAsSingleTextFile("path/to/file.txt")
фактически создаст файл, путь к которомуpath/to/file.txt
:package com.whatever.package import org.apache.spark.rdd.RDD import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.io.compress.CompressionCodec object SparkHelper { // This is an implicit class so that saveAsSingleTextFile can be attached to // SparkContext and be called like this: sc.saveAsSingleTextFile implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal { def saveAsSingleTextFile(path: String): Unit = saveAsSingleTextFileInternal(path, None) def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = saveAsSingleTextFileInternal(path, Some(codec)) private def saveAsSingleTextFileInternal( path: String, codec: Option[Class[_ <: CompressionCodec]] ): Unit = { // The interface with hdfs: val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration) // Classic saveAsTextFile in a temporary folder: hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already codec match { case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec) case None => rdd.saveAsTextFile(s"$path.tmp") } // Merge the folder of resulting part-xxxxx into one file: hdfs.delete(new Path(path), true) // to make sure it's not there already FileUtil.copyMerge( hdfs, new Path(s"$path.tmp"), hdfs, new Path(path), true, rdd.sparkContext.hadoopConfiguration, null ) hdfs.delete(new Path(s"$path.tmp"), true) } } }
который может быть использован таким образом:
import com.whatever.package.SparkHelper.RDDExtensions rdd.saveAsSingleTextFile("path/to/file.txt") // Or if the produced file is to be compressed: import org.apache.hadoop.io.compress.GzipCodec rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])
этот фрагмент сначала сохраняет rdd с
rdd.saveAsTextFile("path/to/file.txt")
во временную папкуpath/to/file.txt.tmp
как будто мы не хотим хранить данные в одном файле (который сохраняет обработку восходящих задач параллельный.)и то ТОЛЬКО, используя файловой системы Hadoop API-интерфейс мы приступаем к слияние (
FileUtil.copyMerge()
) из различных выходных файлов, чтобы создать наш окончательный выходной один файлpath/to/file.txt
.
Как уже упоминалось, вы можете собрать или объединить свой набор данных, чтобы заставить Spark создать один файл. Но это также ограничивает количество задач Spark, которые могут работать с вашим набором данных параллельно. Я предпочитаю, чтобы он создал сто файлов в выходном каталоге HDFS, а затем использовал
hadoop fs -getmerge /hdfs/dir /local/file.txt
для извлечения результатов в один файл в локальной файловой системе. Это имеет наибольший смысл, когда ваш вывод является относительно небольшим отчетом, конечно.
вы сможете сделать это в следующей версии Spark, в текущей версии 1.0.0 это невозможно, если вы не сделаете это вручную, например, как вы упомянули, с вызовом скрипта bash.
Я также хочу отметить, что в документации четко указано, что пользователи должны быть осторожны при вызове coalesce с реальным небольшим количеством разделов . это может привести к тому, что вышестоящие разделы наследуют это число разделов.
Я бы не рекомендовал использовать coalesce(1), если это действительно не требуется.
в Spark 1.6.1 формат, как показано ниже. Он создает один выход file.It лучше всего использовать его, если выход достаточно мал для обработки.В основном то, что он делает, заключается в том, что он возвращает новый RDD, который сводится к разделам numPartitions.Если вы делаете резкое объединение, например, для numPartitions = 1, это может привести к тому, что ваши вычисления будут выполняться на меньшем количестве узлов, чем вам нравится (например, один узел в случае numPartitions = 1)
pair_result.coalesce(1).saveAsTextFile("/app/data/")