Как перезаписать выходной каталог в spark


У меня есть приложение Spark streaming, которое создает набор данных для каждой минуты. Мне нужно сохранить/перезаписать результаты обработанных данных.

когда я попытался перезаписать организацию набора данных.апаш.платформа Hadoop.mapred.FileAlreadyExistsException останавливает выполнение.

Я установил свойство Spark set("spark.files.overwrite","true"), но не повезло.

Как перезаписать или предварительно удалить файлы из spark?

8 73

8 ответов:

обновление: предложите использовать Dataframes, плюс что-то вроде ... .write.mode(SaveMode.Overwrite) ....

для более старых версий, попробуй

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

в 1.1.0 вы можете установить настройки conf с помощью скрипта spark-submit с флагом --conf.

предупреждение: согласно @piggybox есть ошибка в Spark, где он будет только перезаписывать файлы, которые ему нужно написать, это part- файлы, любые другие файлы будут оставлены без изменений.

документация по параметру spark.files.overwrite говорит это: "нужно ли перезаписывать файлы, добавленные через SparkContext.addFile() когда целевой файл существует и его содержимое не совпадает с исходным."Таким образом, это не влияет на метод saveAsTextFiles.

вы можете сделать это перед сохранением файла:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Аас объяснил здесь: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html

С pyspark.язык SQL.Фрейм данных.сохранить документация (в настоящее время в 1.3.1), вы можете указать mode='overwrite' при сохранении фрейма данных:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

Я проверил, что это даже удалит оставшиеся файлы разделов. Поэтому, если вы изначально сказали 10 разделов/файлов, но затем переписали папку с фреймом данных, который имел только 6 разделов, в результирующей папке будет 6 разделов/файлов.

посмотреть Spark SQL documentation дополнительные информация о параметрах режима.

С df.save(path, source, mode) является устаревшим, (http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame)

использовать df.write.format(source).mode("overwrite").save(path)
где ДФ.запись-это DataFrameWriter

'source' может быть ("com.databricks.искра.Авро " / "паркет" / "json")

df.писать.режим ('перезапись').parquet ("/output/folder/path") работает, если вы хотите перезаписать файл parquet с помощью python. Это в spark 1.6.2. API может отличаться в более поздних версиях

  val jobName = "WordCount";
  //overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
  val conf = new 
  SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
  val sc = new SparkContext(conf)

Это перегруженная версия сохранить функция работает для меня:

yourDF.сохранить(поле "выходной путь", орг.апаш.искра.язык SQL.SaveMode.метод valueOf("переписать"))

в приведенном выше примере будет перезаписана существующая папка. Savemode также может принимать эти параметры (https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html):

добавить: режим добавления означает, что при сохранении фрейма данных в данные источник, если данные / таблица уже существуют, содержимое фрейма данных, как ожидается, будет добавлено к существующим данным.

ErrorIfExists: ErrorIfExists означает, что при сохранении фрейма данных в источник данных, если данные уже существуют, ожидается исключение.

игнорировать: режим игнорирования означает, что при сохранении фрейма данных в источник данных, если данные уже существуют, ожидается, что операция сохранения не сохранит содержимое фрейма данных и не менять существующие данные.

Если вы хотите использовать свой собственный формат вывода, вы также сможете получить желаемое поведение с помощью RDD.

посмотрите на следующие классы: FileOutputFormat, FileOutputCommitter

в выходном формате файла у вас есть метод с именем checkOutputSpecs, который проверяет, существует ли выходной каталог. В FileOutputCommitter у вас есть commitJob, который обычно передает данные из временного каталог на свое последнее место.

Я не был в состоянии проверить это пока нет (хотел сделать это, как только у меня появится несколько свободных минут) но теоретически: если я выражаю FileOutputFormat и переопределить checkOutputSpecs к методу, который не бросать исключение на каталог уже существует, и корректировать commitJob способ пользовательского вывода коммиттер для выполнения который-либо логики, что я хочу (например, переопределить некоторые файлы, добавьте другие), чем я, возможно, смогу добиться желаемого поведения с РДУ качестве что ж.

выходной формат передается в: saveAsNewAPIHadoopFile (который также называется методом saveAsTextFile для фактического сохранения файлов). И выходной коммиттер настроен на уровне приложения.