Как происходит разделение записей процесса Hadoop по границам блоков?


по словам Hadoop - The Definitive Guide

логические записи, которые определяют FileInputFormats, обычно не вписываются в блоки HDFS. Например, логичным TextInputFormat записи-это линии, которые будут пересекать границы файловой системы HDFS чаще, чем нет. Это не имеет никакого отношения к функционированию вашей программы-линии не пропущены или сломаны, например-но это стоит знать, так как это означает, что данные-локальные карты (то есть карты, которые работают на том же хосте, что и их входные данные) будет выполнять некоторые удаленные чтения. Небольшие накладные расходы, которые это вызывает, обычно не являются значительными.

предположим, что строка записи разделена на два блока (b1 и b2). Картограф, обрабатывающий первый блок (b1), заметит, что последняя строка не имеет разделителя EOL и извлекает оставшуюся часть строки из следующего блока данных (b2).

как картограф, обрабатывающий второй блок (b2), определяет, что первая запись является неполной и должна процесс начинается со второй записи в блоке (b2)?

6 109

6 ответов:

интересный вопрос, я потратил некоторое время, глядя на код для деталей, и вот мои мысли. Расколы обрабатываются клиентом с помощью InputFormat.getSplits, поэтому взгляд на FileInputFormat дает следующую информацию:

  • для каждого входного файла, получить длину файла, размер блока и рассчитать размер разделения как max(minSize, min(maxSize, blockSize)) здесь maxSize соответствует mapred.max.split.size и minSize и mapred.min.split.size.
  • разделите файл на разные FileSplits на основе разделения размер рассчитан выше. Здесь важно то, что каждого FileSplit инициализируется с start параметр, соответствующий смещению во входном файле. В этот момент все еще нет обработки линий. Соответствующая часть кода выглядит так:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

после этого, если вы посмотрите на LineRecordReader который определяется TextInputFormat, вот где обрабатываются строки:

  • при инициализации LineRecordReader он пытается создать экземпляр LineReader который является абстракцией, чтобы иметь возможность читать строки над FSDataInputStream. Есть 2 случая:
  • если есть CompressionCodec определено, то этот кодек отвечает за обработку границ. Вероятно, не имеет отношения к вашему вопросу.
  • если нет кодека, однако, вот где все интересно: если start вашего InputSplit отличается от 0, то вернуться 1 символ, а затем пропустить первую строку вы сталкиваетесь с определением \n или \r\n (Windows) ! Обратный путь важен, потому что в случае, если ваши границы линий совпадают с разделенными границами, это гарантирует, что вы не пропустите допустимую линию. Вот соответствующий код:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

так как разбиения вычисляются в клиенте, картографы не нужно запускать последовательно, каждый картограф уже знает, если он neds, чтобы отбросить первую строку или нет.

Итак, если у вас есть 2 линии каждый 100Mb в том же файле, и для упрощения скажем, что размер разделения составляет 64Mb. Затем, когда входные разбиения вычисляются, мы будем иметь следующий сценарий:

  • разделить 1, содержащий путь и хосты к этому блоку. Инициализируется в начале 200-200=0 Мб, длина 64 Мб.
  • Split 2 инициализируется в начале 200-200+64=64 Мб, длина 64 Мб.
  • Split 3 инициализируется в начале 200-200+128=128Mb, длина 64Mb.
  • Split 4 инициализируется при запуске 200-200+192=192 Мб, длина 8 МБ.
  • Mapper A будет обрабатывать split 1, start равен 0, поэтому не пропустите первую строку и прочитайте полную строку, которая выходит за пределы 64 Мб, поэтому требуется удаленное чтение.
  • Mapper B будет обрабатывать split 2, start is != 0 поэтому пропустите первую строку после 64Mb-1byte, что соответствует концу строки 1 на 100Mb, который все еще находится в split 2, у нас есть 28Mb строки в split 2, поэтому удаленное чтение оставшихся 72Mb.
  • Mapper C обработает разделение 3, начало есть != 0 поэтому пропустите первую строку после 128Mb-1byte, что соответствует концу строки 2 на 200Mb, что является концом файла, поэтому ничего не делайте.
  • Mapper D такой же, как mapper C, за исключением того, что он ищет новую строку после 192 Мб-1байт.

Карта Reduece алгоритм не работает на физических блоках файла. Он работает на логических входных расщеплений. Разделение ввода зависит от того, где была записана запись. Запись может охватывать два картографа.

путь HDFS был настроен, он разбивает очень большие файлы на большие блоки (например, размером 128 МБ) и сохраняет три копии этих блоков на разных узлах кластера.

HDFS не имеет никакого представления о содержании этих файлы. Запись, возможно, была запущена в блок - но конец этой записи может присутствовать в блок-Б.

чтобы решить эту проблему, Hadoop использует логическое представление данных, хранящихся в блоках файлов, известных как входные разбиения. Когда клиент задания MapReduce вычисляет входной сигнал расщепляется,он вычисляет, где начинается первая целая запись в блоке и где последняя запись в блоке заканчивается.

ключевой момент :

в тех случаях, когда последняя запись в блоке является неполной, входное разделение включает информацию о местоположении для следующего блока и смещение байтов данных, необходимых для завершения записи.

посмотрите на диаграмму ниже.

enter image description here

взгляните на это статьи и связанные с ним SE вопрос : о разделении файлов Hadoop / HDFS

больше деталей можно прочитать от документация

структура Map-Reduce зависит от InputFormat задания:

  1. проверять входные данные-спецификация работы.
  2. разделить входной файл(ы) на логические InputSplits, каждый из которых затем присваивается отдельному Mapper.
  3. каждый InputSplit затем присваивается отдельному пользователю Картограф для обработки. Сплит может быть кортеж. InputSplit[] getSplits(JobConf job,int numSplits) является API, чтобы заботиться о таких вещах.

FileInputFormat, которая расширяет InputFormat реализовала getSplits() метод. Взгляните на внутренности этого метода в grepcode

Я вижу это следующим образом: InputFormat отвечает за разделение данных на логические разбиения с учетом характера данных.
ничто не мешает ему сделать это, хотя это может добавить значительную задержку к заданию - вся логика и чтение вокруг желаемых границ размера разделения произойдет в jobtracker.
Самый простой формат ввода с учетом записи-TextInputFormat. Он работает следующим образом (насколько я понял из кода) - формат ввода разделять по размеру, независимо от строк, но LineRecordReader всегда:
а) пропустить первую строку в Сплит (или ее часть), если это не первый сплит
б) прочитать одну строку после границы разбиения в конце (если данные имеются, значит это не последнее разбиение).

из того, что я понял, когда FileSplit инициализируется для первого блока, вызывается конструктор по умолчанию. Поэтому значения для начала и длины изначально равны нулю. К концу обработки первого блока, если последняя строка является неполной, то значение длины будет больше, чем длина разделения, и он будет читать первую строку следующего блока, а также. Благодаря этому значения для первого блока будет больше нуля и при этом условии LineRecordReader пропустит первую строку второго блока. (См.источник)

если последняя строка первого блока завершена, то значение длины будет равно длине первого блока и значение начала для второго блока будет равно нулю. В таком случае LineRecordReader не будет пропускать первую строку и читать второй блок в начале.

смысл?

из исходного кода Hadoop LineRecordReader.java конструктор: я нахожу некоторые комментарии:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

из этого я считаю, что hadoop будет читать одну дополнительную строку для каждого раскола (в конце текущего раскола, прочитайте следующую строку в следующем расколе), и если не первый раскол, первая строка будет выброшена. так что ни одна строка записи не будет потеряна и неполной

картографы не должны общаться. Блоки файлов находятся в HDFS и могут текущий mapper(RecordReader) может прочитать блок, который имеет оставшуюся часть строки. Это происходит за кулисами.