как сделать saveAsTextFile не разделить вывод на несколько файлов?


при использовании Scala в Spark, всякий раз, когда я сбрасываю результаты с помощью saveAsTextFile, Кажется, разделить выход на несколько частей. Я просто передаю ему параметр (путь).

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
  1. соответствует ли количество выходов количеству редукторов, которые он использует?
  2. означает ли это, что выход сжат?
  3. Я знаю, что могу объединить вывод вместе с помощью bash, но есть ли возможность хранить вывод в одном текстовом файле, без раскол?? Я посмотрел на документы API, но это мало что говорит об этом.
9 65

9 ответов:

причина, по которой он сохраняет его как несколько файлов, заключается в том, что вычисление распределено. Если выход достаточно мал, так что вы думаете, что вы можете поместить его на одной машине, то вы можете закончить свою программу с

val arr = year.collect()

а затем сохраните полученный массив в виде файла, другой способ-использовать пользовательский разделитель,partitionBy, и сделать так, чтобы все идет в один раздел, хотя это не рекомендуется, потому что вы не получите никакого распараллеливания.

если вы требуете, чтобы файл был сохранен с saveAsTextFile можно использовать coalesce(1,true).saveAsTextFile(). Это в основном означает, что вычисление затем объединяется в 1 раздел. Вы также можете использовать repartition(1) это просто обертка для coalesce С аргументом shuffle, установленным в true. Просматривая источник RDD.скала как я понял большую часть этого материала, вы должны взглянуть.

можно назвать coalesce(1) а то saveAsTextFile() - но это может быть плохой идеей, если у вас много данных. Отдельные файлы на разделение генерируются так же, как и в Hadoop, чтобы отдельные картографы и редукторы записывались в разные файлы. Наличие одного выходного файла - это только хорошая идея, если у вас очень мало данных, и в этом случае вы можете также собрать (), как сказал @aaronman.

для тех, кто работает с большой набор:

  • rdd.collect() не следует использовать в этом случае, как это будет собрать все данные как Array в драйвере, который является самым простым способом, чтобы выйти из памяти.

  • rdd.coalesce(1).saveAsTextFile() также не следует использовать, так как параллелизм вышестоящих этапов будет потерян для выполнения на одном узле, где будут храниться данные от.

  • rdd.coalesce(1, shuffle = true).saveAsTextFile()это лучший простой вариант как это будет держать обработку восходящих задач параллельно, а затем выполнять только перетасовку на один узел (rdd.repartition(1).saveAsTextFile() - это точный синоним).

  • rdd.saveAsSingleTextFile() как предусмотрено ниже дополнительно позволяет хранить rdd в одном файле С определенным именем сохраняя при этом свойства параллелизма rdd.coalesce(1, shuffle = true).saveAsTextFile().

что может быть неудобно с rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt") это то, что он на самом деле производит файл, путь которого path/to/file.txt/part-00000, а не path/to/file.txt.

следующее решение rdd.saveAsSingleTextFile("path/to/file.txt") фактически создаст файл, путь к которому path/to/file.txt:

package com.whatever.package

import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec

object SparkHelper {

  // This is an implicit class so that saveAsSingleTextFile can be attached to
  // SparkContext and be called like this: sc.saveAsSingleTextFile
  implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {

    def saveAsSingleTextFile(path: String): Unit =
      saveAsSingleTextFileInternal(path, None)

    def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
      saveAsSingleTextFileInternal(path, Some(codec))

    private def saveAsSingleTextFileInternal(
        path: String, codec: Option[Class[_ <: CompressionCodec]]
    ): Unit = {

      // The interface with hdfs:
      val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)

      // Classic saveAsTextFile in a temporary folder:
      hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
      codec match {
        case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
        case None        => rdd.saveAsTextFile(s"$path.tmp")
      }

      // Merge the folder of resulting part-xxxxx into one file:
      hdfs.delete(new Path(path), true) // to make sure it's not there already
      FileUtil.copyMerge(
        hdfs, new Path(s"$path.tmp"),
        hdfs, new Path(path),
        true, rdd.sparkContext.hadoopConfiguration, null
      )

      hdfs.delete(new Path(s"$path.tmp"), true)
    }
  }
}

который может быть использован таким образом:

import com.whatever.package.SparkHelper.RDDExtensions

rdd.saveAsSingleTextFile("path/to/file.txt")

// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])

этот фрагмент сначала сохраняет rdd с rdd.saveAsTextFile("path/to/file.txt") во временную папку path/to/file.txt.tmp как будто мы не хотим хранить данные в одном файле (который сохраняет обработку восходящих задач параллельный.)

и то ТОЛЬКО, используя файловой системы Hadoop API-интерфейс мы приступаем к слияние (FileUtil.copyMerge()) из различных выходных файлов, чтобы создать наш окончательный выходной один файл path/to/file.txt.

Как уже упоминалось, вы можете собрать или объединить свой набор данных, чтобы заставить Spark создать один файл. Но это также ограничивает количество задач Spark, которые могут работать с вашим набором данных параллельно. Я предпочитаю, чтобы он создал сто файлов в выходном каталоге HDFS, а затем использовал hadoop fs -getmerge /hdfs/dir /local/file.txt для извлечения результатов в один файл в локальной файловой системе. Это имеет наибольший смысл, когда ваш вывод является относительно небольшим отчетом, конечно.

вы сможете сделать это в следующей версии Spark, в текущей версии 1.0.0 это невозможно, если вы не сделаете это вручную, например, как вы упомянули, с вызовом скрипта bash.

Я также хочу отметить, что в документации четко указано, что пользователи должны быть осторожны при вызове coalesce с реальным небольшим количеством разделов . это может привести к тому, что вышестоящие разделы наследуют это число разделов.

Я бы не рекомендовал использовать coalesce(1), если это действительно не требуется.

в Spark 1.6.1 формат, как показано ниже. Он создает один выход file.It лучше всего использовать его, если выход достаточно мал для обработки.В основном то, что он делает, заключается в том, что он возвращает новый RDD, который сводится к разделам numPartitions.Если вы делаете резкое объединение, например, для numPartitions = 1, это может привести к тому, что ваши вычисления будут выполняться на меньшем количестве узлов, чем вам нравится (например, один узел в случае numPartitions = 1)

pair_result.coalesce(1).saveAsTextFile("/app/data/")

можно назвать repartition() и следовать таким образом:

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)

var repartitioned = year.repartition(1)
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00")

enter image description here

вот мой ответ, чтобы вывести один файл. Я только что добавил coalesce(1)

val year = sc.textFile("apat63_99.txt")
              .map(_.split(",")(1))
              .flatMap(_.split(","))
              .map((_,1))
              .reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")

код:

year.coalesce(1).saveAsTextFile("year")