Невозможно создать кэшированный пул потоков с ограничением размера?


кажется невозможным создать кэшированный пул потоков с ограничением количества потоков, которые он может создать.

вот как статические исполнителей.newCachedThreadPool реализован в стандартной библиотеке Java:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Итак, используя этот шаблон, чтобы создать кэшированный пул потоков фиксированного размера:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

Теперь, если вы используете это и представить 3 задачи, все будет хорошо. Отправка любых дальнейших задач приведет к отклонению выполнения исключения.

пробуем этот:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

приведет к тому, что все потоки будут выполняться последовательно. Т. е. пул потоков никогда не будет создавать более одного потока для обработки ваших задач.

это ошибка в методе выполнения ThreadPoolExecutor? Или, может быть, это намеренно? Или есть другой способ?

Edit: я хочу что-то точно такое же, как кэшированный пул потоков (он создает потоки по требованию, а затем убивает их после некоторого таймаута), но с помощью ограничение на количество потоков, которые он может создать, и возможность продолжать ставить в очередь дополнительные задачи, как только он достигнет своего предела потока. Согласно ответу sjlee это невозможно. Глядя на метод execute () ThreadPoolExecutor это действительно невозможно. Мне нужно было бы подкласс ThreadPoolExecutor и переопределить execute (), как это делает SwingWorker, но то, что SwingWorker делает в своем execute (), является полным взломом.

11 108

11 ответов:

ThreadPoolExecutor имеет следующие несколько ключевых поведений, и ваши проблемы могут быть объяснены этими поведениями.

при отправке заданий,

  1. если пул потоков не достиг размера ядра, он создает новые потоки.
  2. если размер ядра был достигнут и нет свободных потоков, он ставит задачи в очередь.
  3. если размер ядра был достигнут, то нет никаких праздных потоков, и очередь становится полной, она создает новые потоки (пока он не достигнет максимального размера).
  4. если максимальный размер был достигнут, нет никаких праздных потоков, и очередь становится полной, политика отклонения срабатывает.

в первом примере обратите внимание, что SynchronousQueue имеет по существу размер 0. Поэтому, как только вы достигнете максимального размера (3), политика отклонения срабатывает (#4).

во втором примере очередь выбора является LinkedBlockingQueue, который имеет неограниченный размер. Таким образом, вы получаете застрял с поведением #2.

вы не можете действительно много возиться с кэшированным типом или фиксированным типом, так как их поведение почти полностью определено.

Если вы хотите иметь ограниченный и динамический пул потоков, вам нужно использовать положительный размер ядра и максимальный размер в сочетании с очередью конечного размера. Например,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

дополнительное соглашение: это довольно старый ответ, и кажется, что JDK изменил свое поведение, когда дело доходит до размера ядра 0. Начиная с JDK 1.6, если размер ядра равен 0 и пул не имеет потоков, ThreadPoolExecutor добавит поток для выполнения этой задачи. Поэтому размер ядра 0 является исключением из приведенного выше правила. Спасибо Стив на привлечении это к моему вниманию.

Если я что-то не пропустил, решение исходного вопроса просто. Следующий код реализует желаемое поведение, как описано автором. Он будет порождать до 5 потоков для работы в неограниченной очереди, а простаивающие потоки завершатся через 60 секунд.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

была такая же проблема. так как ни один другой ответ не ставит все вопросы вместе, я добавляю мой:

теперь это ясно написано в docs: если вы используете очередь, которая не блокирует (LinkedBlockingQueue) установка max threads не имеет никакого эффекта, используются только основные потоки.

так:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

Этот исполнитель:

  1. нет понятия max threads, поскольку мы используем неограниченную очередь. Это хорошо, потому что такие очередь может привести к тому, что исполнитель создаст огромное количество непрофильных, дополнительных потоков, если он будет следовать своей обычной политике.

  2. очередь максимального размера Integer.MAX_VALUE. Submit() бросит RejectedExecutionException если количество отложенных задач, превышает Integer.MAX_VALUE. Не уверен, что у нас закончится память в первую очередь или это произойдет.

  3. имеет 4 основных потока возможно. Неработающие потоки ядра автоматически выходят если неработающий для 5 seconds.So да, строго по запросу темы.Количество можно варьировать с помощью setThreads() метод.

  4. убедитесь, что минимальное количество основных потоков никогда не меньше одного, иначе submit() будет отклонять каждую задачу. Поскольку основные потоки должны быть >= max threads метод setThreads() устанавливает максимальные потоки, хотя настройка максимального потока бесполезна для неограниченной очереди.

в вашем первом примере, последующие задачи отклоняются, поскольку AbortPolicy по умолчанию RejectedExecutionHandler. ThreadPoolExecutor содержит следующие политики, которые можно изменить с помощью setRejectedExecutionHandler способ:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

Это звучит, как вы хотите кэшировать поток бассейн с CallerRunsPolicy.

ни один из ответов здесь не исправил мою проблему, которая была связана с созданием ограниченного количества HTTP-соединений с использованием HTTP-клиента Apache (3.X версии). Поскольку мне потребовалось несколько часов, чтобы выяснить, хорошие настройки, я поделюсь:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

Это создает ThreadPoolExecutor, который начинается с пяти, и занимает не более десяти одновременно запущенных потоков с помощью CallerRunsPolicy для исполнения.

в Javadoc для ThreadPoolExecutor:

Если есть больше, чем corePoolSize, но меньше, чем maximumpoolsize потоков работает, новый поток будет создан только если очередь полна. Установив одинаковые значения corePoolSize и maximumPoolSize, вы создадите пул потоков фиксированного размера.

(выделено мной.)

ответ джиттера-это то, что вы хотите, хотя мой отвечает на ваш другой вопрос. :)

есть еще один вариант. Вместо использования new SynchronousQueue вы можете использовать любую другую очередь, но вы должны убедиться, что ее размер равен 1, так что заставит executorservice создать новый поток.

не похоже, что какой - либо из ответов на самом деле отвечает на вопрос - на самом деле я не вижу способа сделать это-даже если вы подкласс из PooledExecutorService, поскольку многие методы/свойства являются частными, например, создание addIfUnderMaximumPoolSize было защищено, вы могли бы сделать следующее:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

ближе всего я получил это - но даже это не очень хорошее решение

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

p. s. Не проверял выше

Это то, что вы хотите (по крайней мере я так думаю). Для объяснения проверьте Джонатан Файнберг ответ

Executors.newFixedThreadPool(int n)

создает пул потоков, который повторно использует фиксированное число потоков, работающих с общей неограниченной очередью. В любой момент не более nthreads потоков будут активными задачами обработки. Если дополнительные задачи отправляются, когда все потоки активны, они будут ждать в очереди, пока поток не будет доступен. Если какой-либо поток завершается из-за сбоя во время выполнения до завершения работы, новый займет свое место, если это необходимо для выполнения последующих задач. Потоки в пуле будут существовать до его явного завершения.

вот еще одно решение. Я думаю, что это решение ведет себя так, как вы хотите (хотя и не горжусь этим решением):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
  1. можно использовать ThreadPoolExecutor как было предложено @sjlee

    вы можете управлять размером пула динамически. Взгляните на этот вопрос для более подробной информации :

    Динамический Пул Потоков

    или

  2. можно использовать newWorkStealingPool API, который был представлен с java 8.

    public static ExecutorService newWorkStealingPool()
    

    создает пул рабочих потоков, используя все доступные процессоры в качестве целевого уровня параллелизма.

по умолчанию уровень параллелизма установлен на количество ядер ЦП на вашем сервере. Если у вас есть 4-ядерный сервер ЦП, размер пула потоков будет 4. Этот API возвращает ForkJoinPool типа ExecutorService и разрешить кражу работы из незанятых потоков путем кражи задач из занятых потоков в ForkJoinPool.