Как происходит разделение записей процесса Hadoop по границам блоков?
по словам Hadoop - The Definitive Guide
логические записи, которые определяют FileInputFormats, обычно не вписываются в блоки HDFS. Например, логичным TextInputFormat записи-это линии, которые будут пересекать границы файловой системы HDFS чаще, чем нет. Это не имеет никакого отношения к функционированию вашей программы-линии не пропущены или сломаны, например-но это стоит знать, так как это означает, что данные-локальные карты (то есть карты, которые работают на том же хосте, что и их входные данные) будет выполнять некоторые удаленные чтения. Небольшие накладные расходы, которые это вызывает, обычно не являются значительными.
предположим, что строка записи разделена на два блока (b1 и b2). Картограф, обрабатывающий первый блок (b1), заметит, что последняя строка не имеет разделителя EOL и извлекает оставшуюся часть строки из следующего блока данных (b2).
как картограф, обрабатывающий второй блок (b2), определяет, что первая запись является неполной и должна процесс начинается со второй записи в блоке (b2)?
6 ответов:
интересный вопрос, я потратил некоторое время, глядя на код для деталей, и вот мои мысли. Расколы обрабатываются клиентом с помощью
InputFormat.getSplits
, поэтому взгляд на FileInputFormat дает следующую информацию:
- для каждого входного файла, получить длину файла, размер блока и рассчитать размер разделения как
max(minSize, min(maxSize, blockSize))
здесьmaxSize
соответствуетmapred.max.split.size
иminSize
иmapred.min.split.size
.разделите файл на разные
FileSplit
s на основе разделения размер рассчитан выше. Здесь важно то, что каждогоFileSplit
инициализируется сstart
параметр, соответствующий смещению во входном файле. В этот момент все еще нет обработки линий. Соответствующая часть кода выглядит так:while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; }
после этого, если вы посмотрите на
LineRecordReader
который определяетсяTextInputFormat
, вот где обрабатываются строки:
- при инициализации
LineRecordReader
он пытается создать экземплярLineReader
который является абстракцией, чтобы иметь возможность читать строки надFSDataInputStream
. Есть 2 случая:- если есть
CompressionCodec
определено, то этот кодек отвечает за обработку границ. Вероятно, не имеет отношения к вашему вопросу.если нет кодека, однако, вот где все интересно: если
start
вашегоInputSplit
отличается от 0, то вернуться 1 символ, а затем пропустить первую строку вы сталкиваетесь с определением \n или \r\n (Windows) ! Обратный путь важен, потому что в случае, если ваши границы линий совпадают с разделенными границами, это гарантирует, что вы не пропустите допустимую линию. Вот соответствующий код:if (codec != null) { in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start;
так как разбиения вычисляются в клиенте, картографы не нужно запускать последовательно, каждый картограф уже знает, если он neds, чтобы отбросить первую строку или нет.
Итак, если у вас есть 2 линии каждый 100Mb в том же файле, и для упрощения скажем, что размер разделения составляет 64Mb. Затем, когда входные разбиения вычисляются, мы будем иметь следующий сценарий:
- разделить 1, содержащий путь и хосты к этому блоку. Инициализируется в начале 200-200=0 Мб, длина 64 Мб.
- Split 2 инициализируется в начале 200-200+64=64 Мб, длина 64 Мб.
- Split 3 инициализируется в начале 200-200+128=128Mb, длина 64Mb.
- Split 4 инициализируется при запуске 200-200+192=192 Мб, длина 8 МБ.
- Mapper A будет обрабатывать split 1, start равен 0, поэтому не пропустите первую строку и прочитайте полную строку, которая выходит за пределы 64 Мб, поэтому требуется удаленное чтение.
- Mapper B будет обрабатывать split 2, start is != 0 поэтому пропустите первую строку после 64Mb-1byte, что соответствует концу строки 1 на 100Mb, который все еще находится в split 2, у нас есть 28Mb строки в split 2, поэтому удаленное чтение оставшихся 72Mb.
- Mapper C обработает разделение 3, начало есть != 0 поэтому пропустите первую строку после 128Mb-1byte, что соответствует концу строки 2 на 200Mb, что является концом файла, поэтому ничего не делайте.
- Mapper D такой же, как mapper C, за исключением того, что он ищет новую строку после 192 Мб-1байт.
Карта Reduece алгоритм не работает на физических блоках файла. Он работает на логических входных расщеплений. Разделение ввода зависит от того, где была записана запись. Запись может охватывать два картографа.
путь HDFS был настроен, он разбивает очень большие файлы на большие блоки (например, размером 128 МБ) и сохраняет три копии этих блоков на разных узлах кластера.
HDFS не имеет никакого представления о содержании этих файлы. Запись, возможно, была запущена в блок - но конец этой записи может присутствовать в блок-Б.
чтобы решить эту проблему, Hadoop использует логическое представление данных, хранящихся в блоках файлов, известных как входные разбиения. Когда клиент задания MapReduce вычисляет входной сигнал расщепляется,он вычисляет, где начинается первая целая запись в блоке и где последняя запись в блоке заканчивается.
ключевой момент :
в тех случаях, когда последняя запись в блоке является неполной, входное разделение включает информацию о местоположении для следующего блока и смещение байтов данных, необходимых для завершения записи.
посмотрите на диаграмму ниже.
взгляните на это статьи и связанные с ним SE вопрос : о разделении файлов Hadoop / HDFS
больше деталей можно прочитать от документация
структура Map-Reduce зависит от InputFormat задания:
- проверять входные данные-спецификация работы.
- разделить входной файл(ы) на логические InputSplits, каждый из которых затем присваивается отдельному Mapper.
- каждый InputSplit затем присваивается отдельному пользователю Картограф для обработки. Сплит может быть кортеж.
InputSplit[] getSplits(JobConf job,int numSplits
) является API, чтобы заботиться о таких вещах.FileInputFormat, которая расширяет
InputFormat
реализовалаgetSplits
() метод. Взгляните на внутренности этого метода в grepcode
Я вижу это следующим образом: InputFormat отвечает за разделение данных на логические разбиения с учетом характера данных.
ничто не мешает ему сделать это, хотя это может добавить значительную задержку к заданию - вся логика и чтение вокруг желаемых границ размера разделения произойдет в jobtracker.
Самый простой формат ввода с учетом записи-TextInputFormat. Он работает следующим образом (насколько я понял из кода) - формат ввода разделять по размеру, независимо от строк, но LineRecordReader всегда:
а) пропустить первую строку в Сплит (или ее часть), если это не первый сплит
б) прочитать одну строку после границы разбиения в конце (если данные имеются, значит это не последнее разбиение).
из того, что я понял, когда
FileSplit
инициализируется для первого блока, вызывается конструктор по умолчанию. Поэтому значения для начала и длины изначально равны нулю. К концу обработки первого блока, если последняя строка является неполной, то значение длины будет больше, чем длина разделения, и он будет читать первую строку следующего блока, а также. Благодаря этому значения для первого блока будет больше нуля и при этом условииLineRecordReader
пропустит первую строку второго блока. (См.источник)если последняя строка первого блока завершена, то значение длины будет равно длине первого блока и значение начала для второго блока будет равно нулю. В таком случае
LineRecordReader
не будет пропускать первую строку и читать второй блок в начале.смысл?
из исходного кода Hadoop LineRecordReader.java конструктор: я нахожу некоторые комментарии:
// If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start;
из этого я считаю, что hadoop будет читать одну дополнительную строку для каждого раскола (в конце текущего раскола, прочитайте следующую строку в следующем расколе), и если не первый раскол, первая строка будет выброшена. так что ни одна строка записи не будет потеряна и неполной