Как перезаписать выходной каталог в spark
У меня есть приложение Spark streaming, которое создает набор данных для каждой минуты. Мне нужно сохранить/перезаписать результаты обработанных данных.
когда я попытался перезаписать организацию набора данных.апаш.платформа Hadoop.mapred.FileAlreadyExistsException останавливает выполнение.
Я установил свойство Spark set("spark.files.overwrite","true")
, но не повезло.
Как перезаписать или предварительно удалить файлы из spark?
8 ответов:
обновление: предложите использовать
Dataframes
, плюс что-то вроде... .write.mode(SaveMode.Overwrite) ...
.для более старых версий, попробуй
yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false") val sc = SparkContext(yourSparkConf)
в 1.1.0 вы можете установить настройки conf с помощью скрипта spark-submit с флагом --conf.
предупреждение: согласно @piggybox есть ошибка в Spark, где он будет только перезаписывать файлы, которые ему нужно написать, это
part-
файлы, любые другие файлы будут оставлены без изменений.
документация по параметру
spark.files.overwrite
говорит это: "нужно ли перезаписывать файлы, добавленные черезSparkContext.addFile()
когда целевой файл существует и его содержимое не совпадает с исходным."Таким образом, это не влияет на метод saveAsTextFiles.вы можете сделать это перед сохранением файла:
val hadoopConf = new org.apache.hadoop.conf.Configuration() val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf) try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }
Аас объяснил здесь: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html
С pyspark.язык SQL.Фрейм данных.сохранить документация (в настоящее время в 1.3.1), вы можете указать
mode='overwrite'
при сохранении фрейма данных:myDataFrame.save(path='myPath', source='parquet', mode='overwrite')
Я проверил, что это даже удалит оставшиеся файлы разделов. Поэтому, если вы изначально сказали 10 разделов/файлов, но затем переписали папку с фреймом данных, который имел только 6 разделов, в результирующей папке будет 6 разделов/файлов.
посмотреть Spark SQL documentation дополнительные информация о параметрах режима.
С
df.save(path, source, mode)
является устаревшим, (http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame)использовать
df.write.format(source).mode("overwrite").save(path)
где ДФ.запись-это DataFrameWriter'source' может быть ("com.databricks.искра.Авро " / "паркет" / "json")
df.писать.режим ('перезапись').parquet ("/output/folder/path") работает, если вы хотите перезаписать файл parquet с помощью python. Это в spark 1.6.2. API может отличаться в более поздних версиях
val jobName = "WordCount"; //overwrite the output directory in spark set("spark.hadoop.validateOutputSpecs", "false") val conf = new SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false"); val sc = new SparkContext(conf)
Это перегруженная версия сохранить функция работает для меня:
yourDF.сохранить(поле "выходной путь", орг.апаш.искра.язык SQL.SaveMode.метод valueOf("переписать"))
в приведенном выше примере будет перезаписана существующая папка. Savemode также может принимать эти параметры (https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html):
добавить: режим добавления означает, что при сохранении фрейма данных в данные источник, если данные / таблица уже существуют, содержимое фрейма данных, как ожидается, будет добавлено к существующим данным.
ErrorIfExists: ErrorIfExists означает, что при сохранении фрейма данных в источник данных, если данные уже существуют, ожидается исключение.
игнорировать: режим игнорирования означает, что при сохранении фрейма данных в источник данных, если данные уже существуют, ожидается, что операция сохранения не сохранит содержимое фрейма данных и не менять существующие данные.
Если вы хотите использовать свой собственный формат вывода, вы также сможете получить желаемое поведение с помощью RDD.
посмотрите на следующие классы: FileOutputFormat, FileOutputCommitter
в выходном формате файла у вас есть метод с именем checkOutputSpecs, который проверяет, существует ли выходной каталог. В FileOutputCommitter у вас есть commitJob, который обычно передает данные из временного каталог на свое последнее место.
Я не был в состоянии проверить это пока нет (хотел сделать это, как только у меня появится несколько свободных минут) но теоретически: если я выражаю FileOutputFormat и переопределить checkOutputSpecs к методу, который не бросать исключение на каталог уже существует, и корректировать commitJob способ пользовательского вывода коммиттер для выполнения который-либо логики, что я хочу (например, переопределить некоторые файлы, добавьте другие), чем я, возможно, смогу добиться желаемого поведения с РДУ качестве что ж.
выходной формат передается в: saveAsNewAPIHadoopFile (который также называется методом saveAsTextFile для фактического сохранения файлов). И выходной коммиттер настроен на уровне приложения.