Какой вариант выбрать для записи CSV-файла в Spark (HDFS)?
Мне нужно сравнить CSV-файлы, а затем удалить все повторяющиеся строки. Итак, мое состояние похоже на то, что у меня есть одна папка, и я должен поместить каждый отфильтрованный результат в эту папку, и когда появится какой-то новый файл, я должен сравнить существующие файлы в папке с новым и, наконец, я должен вернуть результат в ту же папку.
eg: /data/ingestion/file1.csv a1 b1 c1 a2 b2 c2 a3 b3 c3 /data/ingestion/file2.csv a4 b4 c4 a5 b5 c5 a6 b6 c6 new upcoming file(upcoming_file.csv): a1 b1 c1 a5 b5 c5 a7 b7 c7
Теперь мой подход заключается в создании одного фрейма данных из всех файлов, присутствующих в файле / data / ingestion/*. Затем создание одного фрейма данных upcoming_file.csv и добавление их обоих с помощью операции объединения. Наконец, применение четкой трансформации. Теперь я должен записать его обратно в / data / ingestion, убедившись, что никакой двуличности не будет. Итак, я выбираю операцию перезаписи.
deleted_duplicate.write
.format("csv")
.mode("overwrite")
.save("hdfs://localhost:8020/data/ingestion/")
Затем я, наконец, удаляю все, что находится в папке / data / ingestion. Даже новый фрейм данных не записывается в CSV-файлы.
Я пробовал и другие варианты, но не достиг того, что я объяснил. выше!
Заранее спасибо!
1 ответ:
Я предлагаю записать вывод в новый каталог на hdfs - в случае сбоя обработки вы всегда сможете отбросить все, что было обработано, и запустить обработку с нуля с исходными данными-это безопасно и просто. :)
Когда обработка завершена-просто удалите старый и переименуйте новый в имя старого.
Обновление:
deleted_duplicate.write .format("csv") .mode("overwrite") .save("hdfs://localhost:8020/data/ingestion_tmp/") Configuration conf = new Configuration(); conf.set("fs.hdfs.impl",org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName()); FileSystem hdfs = FileSystem.get(URI.create("hdfs://<namenode-hostname>:<port>"), conf); hdfs.delete("hdfs://localhost:8020/data/ingestion", isRecusrive); hdfs.rename("hdfs://localhost:8020/data/ingestion_tmp", "hdfs://localhost:8020/data/ingestion");
Здесь ссылка на HDFS FileSystem API docs