объединить выходные файлы после фазы уменьшения


в mapreduce каждая задача reduce записывает свои выходные данные в файл с именем part-r-nnnnn здесь nnnnn - идентификатор раздела, связанный с задачей сокращения. Делает map / reduce объединить эти файлы? Если да, то как?

10 72

10 ответов:

вместо того, чтобы выполнять слияние файлов самостоятельно, вы можете делегировать все слияние выходных файлов reduce, вызвав:

hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt

Примечание Это сочетает в файловой системе HDFS файлы локально. Убедитесь, что у вас достаточно места на диске перед запуском

нет, эти файлы не объединяются Hadoop. Количество файлов, которые вы получаете, совпадает с количеством задач сокращения.

Если вам это нужно в качестве входных данных для следующего задания, то не беспокойтесь о наличии отдельных файлов. Просто укажите весь каталог в качестве входных данных для следующего задания.

Если вам нужны данные за пределами кластера, то я обычно объединяю их на приемном конце при извлечении данных из кластера.

т. е. что-то вроде это:

hadoop fs -cat /some/where/on/hdfs/job-output/part-r-* > TheCombinedResultOfTheJob.txt

это функция, которую вы можете использовать для объединения файлов в HDFS

public boolean getMergeInHdfs(String src, String dest) throws IllegalArgumentException, IOException {
    FileSystem fs = FileSystem.get(config);
    Path srcPath = new Path(src);
    Path dstPath = new Path(dest);

    // Check if the path already exists
    if (!(fs.exists(srcPath))) {
        logger.info("Path " + src + " does not exists!");
        return false;
    }

    if (!(fs.exists(dstPath))) {
        logger.info("Path " + dest + " does not exists!");
        return false;
    }
    return FileUtil.copyMerge(fs, srcPath, fs, dstPath, false, config, null);
}

только для текстовых файлов и HDFS в качестве источника и назначения используйте следующую команду:

hadoop fs -cat /input_hdfs_dir/* | hadoop fs -put - /output_hdfs_file

это объединит все файлы в input_hdfs_dir и будет записывать вывод обратно в HDFS в output_hdfs_file. Имейте в виду, что все данные будут возвращены в локальную систему, а затем снова загружены в hdfs, хотя временные файлы не создаются, и это происходит на лету с помощью Unix pe.

кроме того, это не будет работать с нетекстовыми файлами, такие как Авро, орк и др.

для двоичных файлов вы можете сделать что-то вроде этого (если у вас есть таблицы Hive, отображенные в каталогах):

insert overwrite table tbl select * from tbl

в зависимости от вашей конфигурации, это также может создать несколько файлов. Чтобы создать один файл, либо установите число редукторов в 1 явно с помощью mapreduce.job.reduces=1 или установите свойство hive как hive.merge.mapredfiles=true.

вы можете запустить дополнительную задачу map / reduce, где map и reduce не изменяют данные, а partitioner назначает все данные одному редуктору.

файлы part-r-nnnnn генерируются после фазы сокращения, обозначенной 'r' между ними. Теперь дело в том, что если у вас работает один редуктор, у вас будет выходной файл, такой как part-r-00000. Если число редукторов равно 2, то вы будете иметь часть-r-00000 и часть-r-00001 и так далее. Смотрите, если выходной файл слишком велик, чтобы поместиться в память машины, так как платформа hadoop была разработана для работы на Товарные Машины, затем файл разделяется. В соответствии с MRv1, у вас есть предел 20 редукторов для работы над вашей логикой. У вас может быть больше, но то же самое должно быть настроено в файлах конфигурации mapred-сайте.xml. Говоря о вашем вопросе; вы можете либо использовать getmerge, либо вы можете установить количество редукторов в 1, вставив следующий оператор в код драйвера

job.setNumReduceTasks(1);

надеюсь, что это ответ на ваш вопрос.

кроме моего предыдущего ответа у меня есть еще один ответ для вас, который я пытался несколько минут назад. Вы можете использовать CustomOutputFormat который выглядит как код, приведенный ниже

public class VictorOutputFormat extends FileOutputFormat<StudentKey,PassValue> {

    @Override
    public RecordWriter<StudentKey,PassValue> getRecordWriter(
            TaskAttemptContext tac) throws IOException, InterruptedException {
        //step 1: GET THE CURRENT PATH
        Path currPath=FileOutputFormat.getOutputPath(tac);

        //Create the full path
        Path fullPath=new Path(currPath,"Aniruddha.txt");

        //create the file in the file system
        FileSystem fs=currPath.getFileSystem(tac.getConfiguration());
        FSDataOutputStream fileOut=fs.create(fullPath,tac);
        return new VictorRecordWriter(fileOut);
    }

}

просто взгляните на четвертую строку из последней. Я использовал свое собственное имя в качестве имени выходного файла, и я протестировал программу с 15 редукторами. Тем не менее файл остается тем же самым. Таким образом, получение одного файла вместо двух или более возможно еще быть очень ясным размер выходного файла не должен превышать размер первичной памяти, т. е. выходной файл должен помещаться в память товарной машины, иначе может возникнуть проблема с разделением выходного файла. Спасибо!!

почему бы не использовать скрипт pig, подобный этому, для объединения файлов разделов:

stuff = load "/path/to/dir/*"

store stuff into "/path/to/mergedir"

если файлы имеют заголовок, вы можете избавиться от него, сделав это:

hadoop fs -cat /path/to/hdfs/job-output/part-* | grep -v "header" > output.csv

добавить заголовок вручную для вывода.csv

. Map / reduce объединяет эти файлы?

нет. Он не сливается.

можно использовать IdentityReducer для достижения своей цели.

не выполняет никакого сокращения, записывая все входные значения непосредственно на выход.

public void reduce(K key,
                   Iterator<V> values,
                   OutputCollector<K,V> output,
                   Reporter reporter)
            throws IOException

пишет все ключи и значения непосредственно на выходных.

посмотрите на связанные сообщения SE:

hadoop: разница между 0 редуктор и редуктор тож?