Обработка исключений из задач Java ExecutorService
Я пытаюсь использовать Java ThreadPoolExecutor
класс для выполнения большого количества тяжелых задач с фиксированным количеством потоков. Каждая из задач имеет много мест, во время которых она может потерпеть неудачу из-за исключений.
Я подкласса ThreadPoolExecutor
и я переопределил afterExecute
метод, который должен предоставлять любые неперехваченные исключения, встречающиеся во время выполнения задачи. Однако я не могу заставить его работать.
например:
public class ThreadPoolErrors extends ThreadPoolExecutor {
public ThreadPoolErrors() {
super( 1, // core threads
1, // max threads
1, // timeout
TimeUnit.MINUTES, // timeout units
new LinkedBlockingQueue<Runnable>() // work queue
);
}
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if(t != null) {
System.out.println("Got an error: " + t);
} else {
System.out.println("Everything's fine--situation normal!");
}
}
public static void main( String [] args) {
ThreadPoolErrors threadPool = new ThreadPoolErrors();
threadPool.submit(
new Runnable() {
public void run() {
throw new RuntimeException("Ouch! Got an error.");
}
}
);
threadPool.shutdown();
}
}
выход из этого программа "Все в порядке-ситуация нормальная!"несмотря на то, что единственный запускаемый объект, отправленный в пул потоков, создает исключение. Есть ли ключ к тому, что здесь происходит?
спасибо!
11 ответов:
С docs:
Примечание: когда действия заключены в задачи (например, FutureTask) либо явно или с помощью таких методов, как отправить, эти объекты задачи поймать и поддерживать вычислительные исключения и так что они не вызывают резких прекращение, и внутреннее исключения к этому не передаются метод.
когда вы отправляете Runnable, он будет завернут в будущем.
ваш afterExecute должен быть что-то вроде этого:
public final class ExtendedExecutor extends ThreadPoolExecutor { // ... protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?>) { try { Future<?> future = (Future<?>) r; if (future.isDone()) { future.get(); } } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } if (t != null) { System.out.println(t); } } }
предупреждение: следует отметить, что это решение будет блокировать вызывающий поток.
если вы хотите обрабатывать исключения задач, то это, как правило, лучше использовать
Callable
, а неRunnable
.
Callable.call()
разрешено выбрасывать проверенные исключения, и они распространяются обратно в вызывающий поток:Callable task = ... Future future = executor.submit(task); try { future.get(); } catch (ExecutionException ex) { ex.getCause().printStackTrace(); }
если
Callable.call()
бросает исключение, это будет обернуто вExecutionException
и брошенныйFuture.get()
.это, вероятно, будет гораздо предпочтительнее подклассов
ThreadPoolExecutor
. Это также дает вам возможность повторно отправить задачу, если исключение является восстанавливаемым.
объяснение такого поведения в javadoc для afterExecute:
Примечание: когда действия заключены в задачи (например, FutureTask) либо явно или с помощью таких методов, как отправить, эти объекты задачи поймать и поддерживать вычислительные исключения и так что они не вызывают резких прекращение, и внутреннее исключения к этому не передаются метод.
Я обошел его, обернув поставляемый runnable, представленный исполнителю.
CompletableFuture.runAsync( () -> { try { runnable.run(); } catch (Throwable e) { Log.info(Concurrency.class, "runAsync", e); } }, executorService );
Я использую
VerboseRunnable
класс jcabi-log, который проглатывает все исключения и регистрирует их. Очень удобно, например:import com.jcabi.log.VerboseRunnable; scheduler.scheduleWithFixedDelay( new VerboseRunnable( Runnable() { public void run() { // the code, which may throw } }, true // it means that all exceptions will be swallowed and logged ), 1, 1, TimeUnit.MILLISECONDS );
другое решение было бы использовать ManagedTask и ManagedTaskListener.
вам понадобится вызвать или Runnable, который реализует интерфейс ManagedTask.
метод
getManagedTaskListener
возвращает нужный экземпляр.public ManagedTaskListener getManagedTaskListener() {
а вы реализуете в ManagedTaskListener the
taskDone
способ:@Override public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) { if (exception != null) { LOGGER.log(Level.SEVERE, exception.getMessage()); } }
подробнее о управлял задач жизненный цикл и слушатель.
Если вы хотите контролировать выполнение задачи, вы можете запустить 1 или 2 потока (возможно, больше в зависимости от нагрузки) и использовать их для выполнения задач из оболочки ExecutionCompletionService.
если
ExecutorService
исходит из внешнего источника (т. е. невозможно подклассThreadPoolExecutor
и заменитьafterExecute()
), вы можете использовать динамический прокси для достижения желаемого поведения:public static ExecutorService errorAware(final ExecutorService executor) { return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] {ExecutorService.class}, (proxy, method, args) -> { if (method.getName().equals("submit")) { final Object arg0 = args[0]; if (arg0 instanceof Runnable) { args[0] = new Runnable() { @Override public void run() { final Runnable task = (Runnable) arg0; try { task.run(); if (task instanceof Future<?>) { final Future<?> future = (Future<?>) task; if (future.isDone()) { try { future.get(); } catch (final CancellationException ce) { // Your error-handling code here ce.printStackTrace(); } catch (final ExecutionException ee) { // Your error-handling code here ee.getCause().printStackTrace(); } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); } } } } catch (final RuntimeException re) { // Your error-handling code here re.printStackTrace(); throw re; } catch (final Error e) { // Your error-handling code here e.printStackTrace(); throw e; } } }; } else if (arg0 instanceof Callable<?>) { args[0] = new Callable<Object>() { @Override public Object call() throws Exception { final Callable<?> task = (Callable<?>) arg0; try { return task.call(); } catch (final Exception e) { // Your error-handling code here e.printStackTrace(); throw e; } catch (final Error e) { // Your error-handling code here e.printStackTrace(); throw e; } } }; } } return method.invoke(executor, args); }); }
это из-за
AbstractExecutorService :: submit
- это упаковкаrunnable
наRunnableFuture
(ничего, кромеFutureTask
), как показано нижеAbstractExecutorService.java public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE//////// execute(ftask); return ftask; }
затем
execute
пройдет вWorker
иWorker.run()
назовем ниже.ThreadPoolExecutor.java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); /////////HERE//////// } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
наконец-то
task.run();
в приведенном выше коде вызов будет вызыватьFutureTask.run()
. Вот код обработчика исключений, из-за это вы не получаете ожидаемого исключения.class FutureTask<V> implements RunnableFuture<V> public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { /////////HERE//////// result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
это работает
- Он получен из SingleThreadExecutor, но вы можете легко адаптировать его
- код например, лямбды в Java 8, но легко исправить
он создаст исполнителя с одним потоком, который может получить много задач; и будет ждать, пока текущий завершит выполнение, чтобы начать со следующего
в случае ошибки uncaugth или исключения uncaughtExceptionHandler поймает его
public final class SingleThreadExecutorWithExceptions { public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { ThreadFactory factory = (Runnable runnable) -> { final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions"); newThread.setUncaughtExceptionHandler( (final Thread caugthThread,final Throwable throwable) -> { uncaughtExceptionHandler.uncaughtException(caugthThread, throwable); }); return newThread; }; return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), factory){ protected void afterExecute(Runnable runnable, Throwable throwable) { super.afterExecute(runnable, throwable); if (throwable == null && runnable instanceof Future) { try { Future future = (Future) runnable; if (future.isDone()) { future.get(); } } catch (CancellationException ce) { throwable = ce; } catch (ExecutionException ee) { throwable = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (throwable != null) { uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable); } } }); } private static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } } /** * A wrapper class that exposes only the ExecutorService methods * of an ExecutorService implementation. */ private static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future submit(Runnable task) { return e.submit(task); } public Future submit(Callable task) { return e.submit(task); } public Future submit(Runnable task, T result) { return e.submit(task, result); } public List> invokeAll(Collection> tasks) throws InterruptedException { return e.invokeAll(tasks); } public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } } private SingleThreadExecutorWithExceptions() {} }
вместо подкласса ThreadPoolExecutor, я бы предоставил ему ThreadFactory экземпляр, который создает новые потоки и предоставляет им UncaughtExceptionHandler