Есть ли ExecutorService, который использует текущий поток?
то, что мне нужно, - это совместимый способ настроить использование пула потоков или нет. В идеале остальная часть кода не должна быть затронута вообще. Я мог бы использовать пул потоков с 1 потоком, но это не совсем то, что я хочу. Есть идеи?
ExecutorService es = threads == 0 ? new CurrentThreadExecutor() : Executors.newThreadPoolExecutor(threads);
// es.execute / es.submit / new ExecutorCompletionService(es) etc
6 ответов:
вот действительно простой
Executor
(неExecutorService
, имейте в виду) реализация, которая использует только текущий поток. Кража этого из "параллелизма Java на практике" (существенное чтение).public class CurrentThreadExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
ExecutorService
- это более сложный интерфейс, но может быть обработан с такой же подход.
вы можете использовать гуавы
MoreExecutors.newDirectExecutorService()
илиMoreExecutors.directExecutor()
если вам не нуженExecutorService
.если в том числе гуава слишком тяжелый вес, вы можете реализовать что-то почти так же хорошо:
public final class SameThreadExecutorService extends ThreadPoolExecutor { private final CountDownLatch signal = new CountDownLatch(1); private SameThreadExecutorService() { super(1, 1, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); } @Override public void shutdown() { super.shutdown(); signal.countDown(); } public static ExecutorService getInstance() { return SingletonHolder.instance; } private static class SingletonHolder { static ExecutorService instance = createInstance(); } private static ExecutorService createInstance() { final SameThreadExecutorService instance = new SameThreadExecutorService(); // The executor has one worker thread. Give it a Runnable that waits // until the executor service is shut down. // All other submitted tasks will use the RejectedExecutionHandler // which runs tasks using the caller's thread. instance.submit(new Runnable() { @Override public void run() { boolean interrupted = false; try { while (true) { try { instance.signal.await(); break; } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }}); return Executors.unconfigurableScheduledExecutorService(instance); } }
Я написал ExecutorService на основе AbstractExecutorService.
/** * Executes all submitted tasks directly in the same thread as the caller. */ public class SameThreadExecutorService extends AbstractExecutorService { //volatile because can be viewed by other threads private volatile boolean terminated; @Override public void shutdown() { terminated = true; } @Override public boolean isShutdown() { return terminated; } @Override public boolean isTerminated() { return terminated; } @Override public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException { shutdown(); // TODO ok to call shutdown? what if the client never called shutdown??? return terminated; } @Override public List<Runnable> shutdownNow() { return Collections.emptyList(); } @Override public void execute(Runnable theCommand) { theCommand.run(); } }
вы можете использовать RejectedExecutionHandler для запуска задачи в текущем потоке.
public static final ThreadPoolExecutor CURRENT_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { r.run(); } });
вам нужен только один из них когда-нибудь.
мне пришлось использовать тот же" CurrentThreadExecutorService " для тестирования и, хотя все предлагаемые решения были хорошими (особенно тот, который упоминает путь гуавы), Я придумал что-то похожее на то, что Питер Lawrey предложил здесь.
как упоминалось Аксель Циглер здесь, к сожалению, решение Питера фактически не будет работать из-за проверки, введенной в
ThreadPoolExecutor
наmaximumPoolSize
параметр конструктора (т. е.maximumPoolSize
не может быть<=0
).чтобы обойти это, я сделал следующее:
private static ExecutorService currentThreadExecutorService() { CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy(); return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callerRunsPolicy) { @Override public void execute(Runnable command) { callerRunsPolicy.rejectedExecution(command, this); } }; }