Какой вариант выбрать для записи CSV-файла в Spark (HDFS)?


Мне нужно сравнить CSV-файлы, а затем удалить все повторяющиеся строки. Итак, мое состояние похоже на то, что у меня есть одна папка, и я должен поместить каждый отфильтрованный результат в эту папку, и когда появится какой-то новый файл, я должен сравнить существующие файлы в папке с новым и, наконец, я должен вернуть результат в ту же папку.

eg: /data/ingestion/file1.csv

   a1 b1 c1

   a2 b2 c2

   a3 b3 c3

/data/ingestion/file2.csv

   a4 b4 c4

   a5 b5 c5

   a6 b6 c6

new upcoming file(upcoming_file.csv):

   a1 b1 c1

   a5 b5 c5

   a7 b7 c7

Теперь мой подход заключается в создании одного фрейма данных из всех файлов, присутствующих в файле / data / ingestion/*. Затем создание одного фрейма данных upcoming_file.csv и добавление их обоих с помощью операции объединения. Наконец, применение четкой трансформации. Теперь я должен записать его обратно в / data / ingestion, убедившись, что никакой двуличности не будет. Итак, я выбираю операцию перезаписи.

deleted_duplicate.write
  .format("csv")
  .mode("overwrite")
  .save("hdfs://localhost:8020/data/ingestion/")

Затем я, наконец, удаляю все, что находится в папке / data / ingestion. Даже новый фрейм данных не записывается в CSV-файлы.

Я пробовал и другие варианты, но не достиг того, что я объяснил. выше!

Заранее спасибо!

1 2

1 ответ:

Я предлагаю записать вывод в новый каталог на hdfs - в случае сбоя обработки вы всегда сможете отбросить все, что было обработано, и запустить обработку с нуля с исходными данными-это безопасно и просто. :)

Когда обработка завершена-просто удалите старый и переименуйте новый в имя старого.

Обновление:

deleted_duplicate.write
  .format("csv")
  .mode("overwrite")
  .save("hdfs://localhost:8020/data/ingestion_tmp/")

   Configuration conf = new Configuration();
    conf.set("fs.hdfs.impl",org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
    conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName());
    FileSystem  hdfs = FileSystem.get(URI.create("hdfs://<namenode-hostname>:<port>"), conf);
    hdfs.delete("hdfs://localhost:8020/data/ingestion", isRecusrive);
    hdfs.rename("hdfs://localhost:8020/data/ingestion_tmp", "hdfs://localhost:8020/data/ingestion");

Здесь ссылка на HDFS FileSystem API docs