Как избежать исключения Parquet MemoryManager


Я генерирую некоторые паркетные (v1.6.0) выходные данные из скрипта PIG (v0.15.0). Мой скрипт берет несколько входных источников и соединяет их с некоторой вложенностью. Скрипт работает без ошибок, но затем во время операции STORE я получаю:

2016-04-19 17:24:36,299 [PigTezLauncher-0] INFO  org.apache.pig.backend.hadoop.executionengine.tez.TezJob - DAG Status: status=FAILED, progress=TotalTasks: 249 Succeeded: 220 Running: 0 Failed: 1 Killed: 28 FailedTaskAttempts: 43, diagnostics=Vertex failed, vertexName=scope-1446, vertexId=vertex_1460657535752_15030_1_18, diagnostics=[Task failed, taskId=task_1460657535752_15030_1_18_000000, diagnostics=[TaskAttempt 0 failed, info=[Error: Failure while running task:parquet.hadoop.MemoryManager$1: New Memory allocation 134217728 exceeds minimum allocation size 1048576 with largest schema having 132 columns
    at parquet.hadoop.MemoryManager.updateAllocation(MemoryManager.java:125)
    at parquet.hadoop.MemoryManager.addWriter(MemoryManager.java:82)
    at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:104)
    at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:309)
    at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat.getRecordWriter(PigOutputFormat.java:81)
    at org.apache.tez.mapreduce.output.MROutput.initialize(MROutput.java:398)
    ...

Вышеприведенное исключение было вызвано при выполнении скрипта с использованием -x tez, но я получаю то же исключение при использовании mapreduce. Я попытался увеличить распараллеливание, используя SET default_parallel, а также добавив (ненужную w.r.t. мои реальные цели) ORDER BY операцию непосредственно перед моими операциями STORE, чтобы гарантировать, что PIG имеет возможность отправлять данные в различные редукторы и минимизировать память, требуемую на любом данном редукторе. Наконец, я попытался увеличить доступную память, используя SET mapred.child.java.opts. Однако ничего из этого не помогло.

Есть ли что-то, что я просто упускаю? Существуют ли известные стратегии для избежания проблемы одного редуктора, несущего слишком большую нагрузку и вызывающего сбои во время записи? Я сталкивался с подобными проблемами, когда писал в avro выходные данные, которые, по-видимому, вызваны недостаточным объемом памяти для выполнения шага сжатия.

EDIT: per this source file проблема, похоже, сводится к тому, что memAllocation/nCols<minMemAllocation. Однако на распределение памяти, похоже, не влияет настройка mapred.child.java.opts, которую я опробовал.

1 3

1 ответ:

Я решил это окончательно, используя параметр parquet.block.size. Значение по умолчанию (см. source ) достаточно велико, чтобы записать файл шириной 128 столбцов, но не больше. Решением в pig было использовать SET parquet.block.size x;, где x >= y * 1024^2 и y - количество столбцов в выводе.