Является ли ThreadPoolExecutor потокобезопасным?
гарантирует ли ExecutorService потокобезопасность ?
Я буду отправлять задания из разных потоков в один и тот же ThreadPoolExecutor, мне нужно синхронизировать доступ к исполнителю перед взаимодействием/отправкой задач?
6 ответов:
Это правда, классы JDK, о которых идет речь, похоже, не дают явной гарантии потокобезопасной отправки задачи. Однако на практике все реализации ExecutorService в библиотеке действительно потокобезопасны таким образом. Я думаю, что это разумно, чтобы зависеть от этого. Поскольку весь код, реализующий эти функции, был помещен в общественное достояние, нет абсолютно никакой мотивации для кого-либо полностью переписать его по-другому.
(вопреки другим ответам) контракт на потокобезопасность и документально: посмотрите в
interface
javadocs (в отличие от javadoc методов). Например, в нижней части ExecutorService javadoc вы найдете:эффекты непротиворечивости памяти: действия в потоке до отправка выполняемой или вызываемой задачи в Службу ExecutorService происходят-перед любые действия этой задачи, которая, в свою очередь, происходят-перед результат извлекается через будущее.получить.)(
этого достаточно, чтобы ответить на этот:
" должен ли я синхронизировать доступ к исполнителю перед взаимодействием/отправкой задач?"
нет, вы не. Это прекрасно для создания и отправки заданий на любой (правильно реализован)
ExecutorService
без внешней синхронизации. Это одна из главных целей дизайна.
ExecutorService
это одновременно утилита, которая должна сказать, что она предназначена для работы в наибольшей степени, не требуя синхронизации, для производительности. (Синхронизация вызывает конфликт потоков, который может ухудшить эффективность многопоточности-особенно при масштабировании до большого количества потоков.)нет никакой гарантии о том, в какое время в будущем задачи будут выполняться или завершаться (некоторые могут даже выполняться сразу в том же потоке, который их отправил), однако рабочий поток гарантированно видел все эффекты, которые выполнял отправляющий поток до момента подачи. Поэтому (поток, который выполняется) ваша задача также может безопасно читать любые данные, созданные для его использования без синхронизации, потокобезопасных классов или любых других форм "безопасной публикации". Сам акт представления задачи достаточен для "безопасной публикации" входных данных в задачу. Вам просто нужно убедиться, что входные данные не будут изменены в в любом случае, пока задача выполняется.
аналогично, когда вы получаете результат задачи обратно через
Future.get()
, извлекающий поток будет гарантированно видеть все эффекты, сделанные рабочим потоком исполнителя (как в возвращенном результате, так и в любых изменениях побочных эффектов, которые мог сделать рабочий поток).этот контракт также подразумевает, что это нормально для самих задач, чтобы представить больше задач.
" гарантирует ли ExecutorService потокобезопасность ?"
теперь эта часть вопроса гораздо больше общего. Например, не удалось найти ни одного утверждения контракта на потокобезопасность о методе
shutdownAndAwaitTermination
- хотя я отмечаю, что пример кода в Javadoc не использует синхронизацию. (Хотя, возможно, есть скрытое предположение, что завершение работы инициируется тем же потоком, который создал исполнитель, а не, например, рабочий поток?)кстати я бы рекомендовал книгу " Java Concurrency In Практика " для хорошего землянина в мире параллельного программирования.
Ваш вопрос довольно открытый: все
ExecutorService
интерфейс делает это гарантирует, что какой-то поток где-то будет обрабатывать представленныеRunnable
илиCallable
экземпляра.если представленные
Runnable
/Callable
ссылается на общую структуру данных, доступную из другихRunnable
/Callable
s экземпляры (потенциально обрабатываются одновременно различными потоками), то это ваши обязанности для обеспечения потокобезопасности этих данных структура.чтобы ответить на вторую часть вашего вопроса, да, у вас будет доступ к ThreadPoolExecutor перед отправкой любых задач; например
BlockingQueue<Runnable> workQ = new LinkedBlockingQueue<Runnable>(); ExecutorService execService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.SECONDS, workQ); ... execService.submit(new Callable(...));
EDIT
на основе комментария Брайана и в случае, если я неправильно понял ваш вопрос: Представление задач из нескольких потоков производителя в
ExecutorService
обычно будет потокобезопасным (несмотря на то, что он явно не упоминается в API интерфейса, насколько я могу судить). Любой реализация, которая не предлагала потокобезопасность, была бы бесполезной в многопоточной среде (поскольку несколько производителей / несколько потребителей-довольно распространенная парадигма), и это именно то, чтоExecutorService
(а остальныеjava.util.concurrent
) был предназначен для.
на
ThreadPoolExecutor
ответ:да.ExecutorService
тут не мандат или иным образом гарантировать, что все реализации потокобезопасны, и это не может, поскольку это интерфейс. Эти типы контрактов находятся вне области действия интерфейса Java. Однако,ThreadPoolExecutor
и то, и другое четко документировано как потокобезопасное. Более того,ThreadPoolExecutor
управляет это очередь заданий с помощьюjava.util.concurrent.BlockingQueue
который является интерфейсом, который запрашивает все реализации потокобезопасны. Любойjava.util.concurrent.*
реализацияBlockingQueue
можно смело считать потокобезопасным. Любая нестандартная реализация не может, хотя это было бы откровенно глупо, если бы кто-то предоставилBlockingQueue
реализация очереди, которая не была потокобезопасной.так что ответ на ваш титульный вопрос явно да. Ответ на последующий текст вашего вопроса -наверное, так как есть некоторые расхождения между ними.
вопреки тому, что ответ Люк Usherwood претензии, это не подразумевается в документации, что
ExecutorService
реализаций гарантированно будут ориентированы на многопотоковое исполнение. Что касается вопроса оThreadPoolExecutor
в частности, см. другие ответы.да, a происходит-перед связь указана, но это не означает ничего о потокобезопасности самих методов, как прокомментировал км. В Люк Usherwood ' s ответ утверждается, что первое достаточно для доказательства второго, но никаких фактических аргументов не приводится.
"потокобезопасность" может означать разные вещи, но вот простой встречный пример
Executor
(неExecutorService
но это не имеет никакого значения), что тривиально соответствует требуемому происходит-перед отношения, но не является потокобезопасным из-за несинхронизированного доступа к
для ThreadPoolExecutor он представляет потокобезопасность. Вы можете увидеть исходный код в jdk8. При добавлении новой задачи он использует mainLock для обеспечения потокобезопасности.
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }