Как сделать блок метода submit() ThreadPoolExecutor, если он насыщен?
Я хочу создать ThreadPoolExecutor
таким образом, когда он достиг своего максимального размера и очередь заполнена,submit()
метод блокируется при попытке добавить новые задачи. Мне нужно реализовать пользовательский RejectedExecutionHandler
для этого и существует способ сделать это с помощью стандартной библиотеки Java?
19 ответов:
одно из возможных решений я нашел:
public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException { semaphore.acquire(); try { exec.execute(new Runnable() { public void run() { try { command.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { semaphore.release(); throw e; } } }
есть ли другие решения? Я бы предпочел что-то на основе
RejectedExecutionHandler
так как это похоже на стандартный способ справиться с такими ситуациями.
вы можете использовать ThreadPoolExecutor и blockingQueue:
public class ImageManager { BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize); RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); private ExecutorService executorService = new ThreadPoolExecutor(numOfThread, numOfThread, 0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler); private int downloadThumbnail(String fileListPath){ executorService.submit(new yourRunnable()); } }
проверить четыре альтернативы для этого: создание NotifyingBlockingThreadPoolExecutor
вы должны использовать
CallerRunsPolicy
, который выполняет отклоненную задачу в вызывающем потоке. Таким образом, он не может отправлять какие-либо новые задачи исполнителю до тех пор, пока эта задача не будет выполнена, и в этот момент появятся некоторые свободные потоки пула или процесс повторится.документы:
отклоненных задач
новые задачи представлено в методе execute (java.ленг.Runnable) будет отклоняется, когда исполнитель был выключите, а также когда исполнитель использует конечные границы для обоих максимумов потоки и емкость рабочей очереди, а также насыщается. В любом случае метод execute вызывает RejectedExecutionHandler.rejectedExecution (java.ленг.Работоспособный, Ява.утиль.параллельный.ThreadPoolExecutor) способ его RejectedExecutionHandler. Четыре предопределенные политики обработчика при условии:
- в ThreadPoolExecutor по умолчанию.AbortPolicy, the обработчик создает среду выполнения RejectedExecutionException по факту отказ.
- В ThreadPoolExecutor.CallerRunsPolicy, поток, который вызывает выполнить себя запускает задачу. Это обеспечивает простой механизм управления с обратной связью, который будет замедлите скорость выполнения новых задач представлен.
- В ThreadPoolExecutor.DiscardPolicy, a задача, которая не может быть выполнена просто упал.
- В ThreadPoolExecutor.DiscardOldestPolicy, если исполнитель не выключен, то задача во главе рабочей очереди-это отброшено, а затем выполнение повторяется (что может снова потерпеть неудачу, вызывая это повторять.)
кроме того, не забудьте использовать ограниченную очередь, такую как ArrayBlockingQueue, при вызове
ThreadPoolExecutor
конструктор. В противном случае ничто не будет отвергнуто.изменить: в ответ на ваш комментарий, установите размер ArrayBlockingQueue должен быть равен максимальному размеру пула потоков и использовать AbortPolicy.
Edit 2: Хорошо, я вижу, к чему вы клоните. Как насчет этого: переопределите
beforeExecute()
метод, чтобы проверить, чтоgetActiveCount()
не превышаетgetMaximumPoolSize()
, и если это так, спать и попробовать еще раз?
Hibernate имеет
BlockPolicy
это просто и может делать то, что вы хотите:посмотреть: исполнителей.java
/** * A handler for rejected tasks that will have the caller block until * space is available. */ public static class BlockPolicy implements RejectedExecutionHandler { /** * Creates a <tt>BlockPolicy</tt>. */ public BlockPolicy() { } /** * Puts the Runnable to the blocking queue, effectively blocking * the delegating thread until space is available. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { try { e.getQueue().put( r ); } catch (InterruptedException e1) { log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r ); } } }
The
BoundedExecutor
ответ цитируется выше от параллелизм Java на практике корректно работает только при использовании неограниченной очереди для исполнителя, или привязка семафора не превышает размер очереди. Семафор-это состояние, разделяемое между отправляющим потоком и потоками в пуле, что позволяет насыщать исполнителя, даже если размер очередииспользуя
CallerRunsPolicy
действует только в том случае, если ваши задачи не выполняются вечно, в котором если ваш отправляющий поток останется вrejectedExecution
навсегда, и плохая идея, если ваши задачи занимают много времени для запуска, потому что отправляющий поток не может отправлять новые задачи или делать что-либо еще, если он запускает саму задачу.Если это не приемлемо, то я предлагаю проверить размер ограниченной очереди исполнителя перед отправкой задачи. Если очередь заполнена, то подождите некоторое время, прежде чем пытаться отправить снова. Пропускная способность будет страдать, но я предлагаю это простое решение чем многие другие предлагаемые решения, и вы гарантированно ни одна задача не будет отклонена.
следующий класс обертывает ThreadPoolExecutor и использует семафор для блокировки, после чего рабочая очередь заполняется:
public final class BlockingExecutor { private final Executor executor; private final Semaphore semaphore; public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) { BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory); this.semaphore = new Semaphore(queueSize + maxPoolSize); } private void execImpl (final Runnable command) throws InterruptedException { semaphore.acquire(); try { executor.execute(new Runnable() { @Override public void run() { try { command.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { // will never be thrown with an unbounded buffer (LinkedBlockingQueue) semaphore.release(); throw e; } } public void execute (Runnable command) throws InterruptedException { execImpl(command); } }
этот класс-оболочка основан на решении, приведенном в книге Java Concurrency in Practice Брайана Гетца. Решение в книге принимает только два параметра конструктора: an
Executor
и привязка, используемая для семафора. Это показано в ответе, данном Fixpoint. Существует проблема с этим подходом: он может попасть в состояние, когда потоки пула заняты, очередь заполнена, но семафор только что выпустил разрешение. (semaphore.release()
в блоке finally). В этом состоянии новая задача может захватить только что выпущенное разрешение, но отклоняется, поскольку очередь задач заполнена. Конечно, это не то, что вы хотите; вы хотите заблокировать в этом случае.чтобы решить эту проблему, мы должны использовать неограниченная очередь, как ясно упоминает JCiP. Семафор действует как охранник, давая эффект размера виртуальной очереди. Это имеет побочный эффект что возможно, что блок может содержать
maxPoolSize + virtualQueueSize + maxPoolSize
задач. Почему? Из-заsemaphore.release()
в блоке finally. Если все потоки пула вызывают этот оператор одновременно, тоmaxPoolSize
в выдаче вида на жительство, позволяющий одинаковое количество задач, чтобы войти в блок. Если бы мы использовали ограниченную очередь, она все равно была бы заполнена, что привело бы к отклонению задачи. Теперь, поскольку мы знаем, что это происходит только тогда, когда поток пула почти завершен, это не проблема. Мы знаем, что поток пула не будет блок, поэтому задача скоро будет взята из очереди.вы можете использовать ограниченную очередь. Просто убедитесь, что его размер равен
virtualQueueSize + maxPoolSize
. Большие размеры бесполезны, семафор не позволит впустить больше предметов. Меньшие размеры приведут к отклонению задач. Вероятность того, что задачи будут отклонены, увеличивается по мере уменьшения размера. Например, предположим, что вам нужен ограниченный исполнитель с maxPoolSize=2 и virtualQueueSize=5. Затем возьмите семафор с 5 + 2=7 разрешениями и фактическим размер очереди 5+2=7. Реальное количество задач, которые могут быть в блоке, то 2+5+2=9. Когда исполнитель заполнен (5 задач в очереди, 2 в пуле потоков, поэтому доступно 0 разрешений) и все потоки пула освобождают свои разрешения, то ровно 2 разрешения могут быть приняты входящими задачами.теперь решение от JCiP несколько громоздко использовать, поскольку оно не применяет все эти ограничения (неограниченная очередь или ограниченные этими математическими ограничениями и т. д.). Я думаю, что это служит хорошим пример, демонстрирующий, как можно создавать новые потокобезопасные классы на основе уже доступных частей, но не как полноценный многоразовый класс. Я не думаю, что последнее было намерением автора.
Я знаю, это хак, но на мой взгляд самый чистый Хак между теми, которые предлагаются здесь ; -)
поскольку ThreadPoolExecutor использует блокирующую очередь " предложение "вместо" put", позволяет переопределить поведение" предложения " блокирующей очереди:
class BlockingQueueHack<T> extends ArrayBlockingQueue<T> { BlockingQueueHack(int size) { super(size); } public boolean offer(T task) { try { this.put(task); } catch (InterruptedException e) { throw new RuntimeException(e); } return true; } } ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));
Я проверил это, и это, кажется, работает. Реализация некоторой политики тайм-аута остается в качестве упражнения для чтения.
вы можете использовать пользовательский RejectedExecutionHandler, как это
ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size max_handlers, // max size timeout_in_seconds, // idle timeout TimeUnit.SECONDS, queue, new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // This will block if the queue is full try { executor.getQueue().put(r); } catch (InterruptedException e) { System.err.println(e.getMessage()); } } });
создайте свою собственную очередь блокировки, которая будет использоваться исполнителем, с блокирующим поведением, которое вы ищете, всегда возвращая доступную оставшуюся емкость (гарантируя, что исполнитель не будет пытаться создать больше потоков, чем его основной пул, или вызвать обработчик отклонения).
Я считаю, что это поможет вам блокирующее поведение, которое вы ищете. Обработчик отклонения никогда не будет соответствовать счету, так как это указывает на то, что исполнитель не может выполнить задачу. Что я мог себе представить существует то, что вы получаете некоторую форму "занятого ожидания" в обработчике. Это не то, что вы хотите, вы хотите очередь для исполнителя, который блокирует вызывающего абонента...
вы должны взглянуть на эта ссылка (notifying-blocking-thread-pool) который суммирует несколько решений и, наконец, дает элегантный с уведомлением.
чтобы избежать проблем с решением @FixPoint. Можно было бы использовать ListeningExecutorService и освободить семафор onSuccess и onFailure внутри FutureCallback.
недавно я обнаружил, что этот вопрос имеет ту же проблему. ОП не говорит об этом явно, но мы не хотим использовать
RejectedExecutionHandler
который выполняет задачу в потоке отправителя, потому что это будет недостаточно использовать рабочие потоки, если эта задача является длительной.чтение всех ответов и комментариев, в частности ошибочное решение с семафором или с помощью
afterExecute
у меня был более близкий взгляд на код ThreadPoolExecutor чтобы увидеть, если есть какой-то выход. Я был поражен, увидев, что есть более чем 2000 строк кода (с комментариями), некоторые из которых заставляют меня чувствовать себя головокружение. Учитывая довольно простое требование, которое у меня на самом деле есть-один производитель, несколько потребителей, пусть производитель блокирует, когда ни один потребитель не может работать-я решил свернуть свое собственное решение. Это неExecutorService
, а простоExecutor
. И он не приспосабливает количество потоков к рабочей нагрузке, а держит фиксированное количество потоков только, что также соответствует моему требования. Вот этот код. Не стесняйтесь разглагольствовать об этом :-)package x; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; /** * distributes {@code Runnable}s to a fixed number of threads. To keep the * code lean, this is not an {@code ExecutorService}. In particular there is * only very simple support to shut this executor down. */ public class ParallelExecutor implements Executor { // other bounded queues work as well and are useful to buffer peak loads private final BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>(); private final Thread[] threads; /*+**********************************************************************/ /** * creates the requested number of threads and starts them to wait for * incoming work */ public ParallelExecutor(int numThreads) { this.threads = new Thread[numThreads]; for(int i=0; i<numThreads; i++) { // could reuse the same Runner all over, but keep it simple Thread t = new Thread(new Runner()); this.threads[i] = t; t.start(); } } /*+**********************************************************************/ /** * returns immediately without waiting for the task to be finished, but may * block if all worker threads are busy. * * @throws RejectedExecutionException if we got interrupted while waiting * for a free worker */ @Override public void execute(Runnable task) { try { workQueue.put(task); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("interrupt while waiting for a free " + "worker.", e); } } /*+**********************************************************************/ /** * Interrupts all workers and joins them. Tasks susceptible to an interrupt * will preempt their work. Blocks until the last thread surrendered. */ public void interruptAndJoinAll() throws InterruptedException { for(Thread t : threads) { t.interrupt(); } for(Thread t : threads) { t.join(); } } /*+**********************************************************************/ private final class Runner implements Runnable { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { Runnable task; try { task = workQueue.take(); } catch (InterruptedException e) { // canonical handling despite exiting right away Thread.currentThread().interrupt(); return; } try { task.run(); } catch (RuntimeException e) { // production code to use a logging framework e.printStackTrace(); } } } } }
Я считаю, что вполне элегантный способ решить эту проблему с помощью
java.util.concurrent.Semaphore
и делегирование поведениеExecutor.newFixedThreadPool
. Новая служба исполнителя будет выполнять новую задачу только тогда, когда для этого есть поток. Блокировка управляется семафором с количеством разрешений равным количеству потоков. Когда задача завершена, она возвращает разрешение.public class FixedThreadBlockingExecutorService extends AbstractExecutorService { private final ExecutorService executor; private final Semaphore blockExecution; public FixedThreadBlockingExecutorService(int nTreads) { this.executor = Executors.newFixedThreadPool(nTreads); blockExecution = new Semaphore(nTreads); } @Override public void shutdown() { executor.shutdown(); } @Override public List<Runnable> shutdownNow() { return executor.shutdownNow(); } @Override public boolean isShutdown() { return executor.isShutdown(); } @Override public boolean isTerminated() { return executor.isTerminated(); } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return executor.awaitTermination(timeout, unit); } @Override public void execute(Runnable command) { blockExecution.acquireUninterruptibly(); executor.execute(() -> { try { command.run(); } finally { blockExecution.release(); } }); }
У меня была такая же потребность в прошлом: своего рода блокирующая очередь с фиксированным размером для каждого клиента, поддерживаемая общим пулом потоков. Я закончил тем, что написал свой собственный вид ThreadPoolExecutor:
UserThreadPoolExecutor (блокирующая очередь (для каждого клиента) + threadpool (общий для всех клиентов))
см.:https://github.com/d4rxh4wx/UserThreadPoolExecutor
каждому UserThreadPoolExecutor присваивается максимальное количество потоков из общего доступа ThreadPoolExecutor
каждый пользователь ThreadPoolExecutor может:
- отправить задачу исполнителю пула общих потоков, если его квота не достигнута. Если квота исчерпана, задание ставится в очередь (непотребительского блокирования в ожидании процессора). После завершения одной из представленных задач квота уменьшается, позволяя другой задаче, ожидающей отправки в ThreadPoolExecutor
- дождитесь завершения остальных задач
Я нашел эту политику отклонения в клиенте эластичного поиска. Он блокирует поток вызывающего абонента при блокировке очереди. Код ниже -
static class ForceQueuePolicy implements XRejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { //should never happen since we never wait throw new EsRejectedExecutionException(e); } } @Override public long rejected() { return 0; } }
мне недавно нужно было добиться чего-то подобного, но на a
ScheduledExecutorService
.Я также должен был убедиться, что я обрабатываю задержку, передаваемую по методу, и гарантирую, что либо задача будет отправлена для выполнения в то время, когда вызывающий ожидает, либо просто не сможет, таким образом, бросая
RejectedExecutionException
.другие методы от
ScheduledThreadPoolExecutor
чтобы выполнить или отправить задачу внутренне вызов#schedule
который по - прежнему будет вызывать переопределенные методы.import java.util.concurrent.*; public class BlockingScheduler extends ScheduledThreadPoolExecutor { private final Semaphore maxQueueSize; public BlockingScheduler(int corePoolSize, ThreadFactory threadFactory, int maxQueueSize) { super(corePoolSize, threadFactory, new AbortPolicy()); this.maxQueueSize = new Semaphore(maxQueueSize); } @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay)); return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS); } @Override public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay)); return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay)); return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS); } @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit) { final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay)); return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS); } @Override protected void afterExecute(Runnable runnable, Throwable t) { super.afterExecute(runnable, t); try { if (t == null && runnable instanceof Future<?>) { try { ((Future<?>) runnable).get(); } catch (CancellationException | ExecutionException e) { t = e; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (t != null) { System.err.println(t); } } finally { releaseQueueUsage(); } } private long beforeSchedule(Runnable runnable, long delay) { try { return getQueuePermitAndModifiedDelay(delay); } catch (InterruptedException e) { getRejectedExecutionHandler().rejectedExecution(runnable, this); return 0; } } private long beforeSchedule(Callable callable, long delay) { try { return getQueuePermitAndModifiedDelay(delay); } catch (InterruptedException e) { getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this); return 0; } } private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException { final long beforeAcquireTimeStamp = System.currentTimeMillis(); maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS); final long afterAcquireTimeStamp = System.currentTimeMillis(); return afterAcquireTimeStamp - beforeAcquireTimeStamp; } private void releaseQueueUsage() { maxQueueSize.release(); } }
у меня есть код здесь, будем признательны за любую обратную связь. https://github.com/AmitabhAwasthi/BlockingScheduler
вот решение, которое, кажется, работает очень хорошо. Это называется NotifyingBlockingThreadPoolExecutor.
Edit: есть вопрос С этим кодом метод await () является багги. Вызов shutdown () + awaitTermination () кажется, работает нормально.
мне не всегда нравится CallerRunsPolicy, тем более, что он позволяет отклоненной задаче "пропустить очередь" и выполнить до задач, которые были отправлены ранее. Кроме того, выполнение задачи в вызывающем потоке может занять гораздо больше времени, чем ожидание появления первого слота.
Я решил эту проблему с помощью настраиваемого RejectedExecutionHandler, который просто блокирует вызывающий поток на некоторое время, а затем пытается представить задачу и снова:
public class BlockWhenQueueFull implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // The pool is full. Wait, then try again. try { long waitMs = 250; Thread.sleep(waitMs); } catch (InterruptedException interruptedException) {} executor.execute(r); } }
этот класс можно просто использовать в исполнителе пула потоков как RejectedExecutinHandler, как и любой другой, например:
executorPool = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new BlockWhenQueueFull());
единственный недостаток, который я вижу, заключается в том, что вызывающий поток может быть заблокирован немного дольше, чем это необходимо (до 250 мс). Кроме того, поскольку этот исполнитель фактически вызывается рекурсивно, очень долгое ожидание появления потока (часы) может привести к переполнению стека.
Тем Не Менее, Я лично мне нравится этот метод. Это компактный, легко понять, и работает хорошо.