ThreadPoolExecutor с неограниченной очередью не создает новые потоки


Мой ThreadPoolExecutor не может создать новые потоки. На самом деле я написал несколько хаки LinkedBlockingQueue, который примет любую задачу (т. е. она неограничена), но вызовет дополнительный обработчик - который в моем приложении извергает предупреждение трассировки, что пул позади - который дает мне очень явную информацию, что TPE отказывается создавать новые потоки, даже если очередь имеет тысячи записей в нем. Мой конструктор выглядит следующим образом:

private final ExecutorService s3UploadPool = 
new ThreadPoolExecutor(1, 40, 1, TimeUnit.HOURS, unboundedLoggingQueue);

Почему он не создает новые потоки?

3 16

3 ответа:

Об этом рассказывается в этой записи в блоге:

Эта конструкция пула потоков просто не будет работать должным образом. Это происходит из-за логики в ThreadPoolExecutor, где новые потоки добавляются, если не удается предложить задачу очереди. В нашем случае мы используем неограниченную LinkedBlockingQueue, где мы всегда можем предложить задачу очереди. Это фактически означает, что мы никогда не вырастем выше размера основного пула и до максимального пула размер.

Если Вам также нужно отделить минимальный размер пула от максимального, вам придется выполнить некоторое расширенное кодирование. Я не знаю решения, которое существует в библиотеках Java или Apache Commons. Решение состоит в том, чтобы создать связанный BlockingQueue, который знает о TPE, и будет изо всех сил отклонять задачу, если он знает, что TPE не имеет доступных потоков, а затем вручную запрашивать. Более подробно об этом говорится в статье linked post. В конечном счете ваша конструкция будет выглядеть например:

public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime) {
   ScalingQueue queue = new ScalingQueue();
   ThreadPoolExecutor executor =
      new ScalingThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue);
   executor.setRejectedExecutionHandler(new ForceQueuePolicy());
   queue.setThreadPoolExecutor(executor);
   return executor;
}

Однако проще установить corePoolSize в maxPoolSize и не беспокоиться об этой ерунде.

Как упоминал @djechlin, это часть (удивительного для многих) определенного поведения ThreadPoolExecutor. Я считаю, что нашел несколько элегантное решение вокруг этого поведения, которое я показываю в своем ответе здесь:

Как заставить ThreadPoolExecutor увеличить потоки до максимума перед очередью?

В основном вы расширяете LinkedBlockingQueue, чтобы он всегда возвращал false для queue.offer(...), что добавит дополнительные потоки в пул, если это необходимо. Если бассейн уже на максимуме потоки и они все заняты, будет вызван RejectedExecutionHandler. Это обработчик, который затем делает put(...) в очередь.

Смотрите мой код там.

Существует обходной путь решения этой проблемы. Рассмотрим следующую реализацию:

int corePoolSize = 40;
int maximumPoolSize = 40;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 
    60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
threadPoolExecutor.allowCoreThreadTimeOut(true);

Установив allowCoreThreadTimeOut() в true, потокам в пуле разрешается завершать работу после указанного таймаута (60 секунд в данном примере). В этом решении именно аргумент конструктора corePoolSize определяет максимальный размер пула на практике, поскольку пул потоков вырастет до corePoolSize, а затем начнет добавлять задания в очередь. Вполне вероятно, что пул никогда не может стать больше, чем это, потому что пул не будет порождать новые потоки, пока очередь не заполнится (что, учитывая, что LinkedBlockingQueue имеет емкость Integer.MAX_VALUE, может никогда не произойти). Следовательно, нет никакого смысла устанавливать maximumPoolSize на большее значение, чем corePoolSize.

Внимание: пул потоков имеет 0 незанятых потоков после истечения таймаута, что означает, что будет некоторая задержка перед созданием потоков (обычно у вас всегда есть corePoolSize потоки доступный).

Более подробную информацию можно найти в JavaDocThreadPoolExecutor .