ThreadPoolExecutor с неограниченной очередью не создает новые потоки
Мой ThreadPoolExecutor
не может создать новые потоки. На самом деле я написал несколько хаки LinkedBlockingQueue
, который примет любую задачу (т. е. она неограничена), но вызовет дополнительный обработчик - который в моем приложении извергает предупреждение трассировки, что пул позади - который дает мне очень явную информацию, что TPE отказывается создавать новые потоки, даже если очередь имеет тысячи записей в нем. Мой конструктор выглядит следующим образом:
private final ExecutorService s3UploadPool =
new ThreadPoolExecutor(1, 40, 1, TimeUnit.HOURS, unboundedLoggingQueue);
Почему он не создает новые потоки?
3 ответа:
Об этом рассказывается в этой записи в блоге:
Эта конструкция пула потоков просто не будет работать должным образом. Это происходит из-за логики в ThreadPoolExecutor, где новые потоки добавляются, если не удается предложить задачу очереди. В нашем случае мы используем неограниченную LinkedBlockingQueue, где мы всегда можем предложить задачу очереди. Это фактически означает, что мы никогда не вырастем выше размера основного пула и до максимального пула размер.
Если Вам также нужно отделить минимальный размер пула от максимального, вам придется выполнить некоторое расширенное кодирование. Я не знаю решения, которое существует в библиотеках Java или Apache Commons. Решение состоит в том, чтобы создать связанный
BlockingQueue
, который знает о TPE, и будет изо всех сил отклонять задачу, если он знает, что TPE не имеет доступных потоков, а затем вручную запрашивать. Более подробно об этом говорится в статье linked post. В конечном счете ваша конструкция будет выглядеть например:public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime) { ScalingQueue queue = new ScalingQueue(); ThreadPoolExecutor executor = new ScalingThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue); executor.setRejectedExecutionHandler(new ForceQueuePolicy()); queue.setThreadPoolExecutor(executor); return executor; }
Однако проще установить
corePoolSize
вmaxPoolSize
и не беспокоиться об этой ерунде.
Как упоминал @djechlin, это часть (удивительного для многих) определенного поведения
ThreadPoolExecutor
. Я считаю, что нашел несколько элегантное решение вокруг этого поведения, которое я показываю в своем ответе здесь:Как заставить ThreadPoolExecutor увеличить потоки до максимума перед очередью?
В основном вы расширяете
LinkedBlockingQueue
, чтобы он всегда возвращал false дляqueue.offer(...)
, что добавит дополнительные потоки в пул, если это необходимо. Если бассейн уже на максимуме потоки и они все заняты, будет вызванRejectedExecutionHandler
. Это обработчик, который затем делаетput(...)
в очередь.Смотрите мой код там.
Существует обходной путь решения этой проблемы. Рассмотрим следующую реализацию:
int corePoolSize = 40; int maximumPoolSize = 40; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor.allowCoreThreadTimeOut(true);
Установив allowCoreThreadTimeOut() в
true
, потокам в пуле разрешается завершать работу после указанного таймаута (60 секунд в данном примере). В этом решении именно аргумент конструктораcorePoolSize
определяет максимальный размер пула на практике, поскольку пул потоков вырастет доcorePoolSize
, а затем начнет добавлять задания в очередь. Вполне вероятно, что пул никогда не может стать больше, чем это, потому что пул не будет порождать новые потоки, пока очередь не заполнится (что, учитывая, чтоLinkedBlockingQueue
имеет емкостьInteger.MAX_VALUE
, может никогда не произойти). Следовательно, нет никакого смысла устанавливатьmaximumPoolSize
на большее значение, чемcorePoolSize
.Внимание: пул потоков имеет 0 незанятых потоков после истечения таймаута, что означает, что будет некоторая задержка перед созданием потоков (обычно у вас всегда есть
corePoolSize
потоки доступный).Более подробную информацию можно найти в JavaDocThreadPoolExecutor .