Является ли ThreadPoolExecutor потокобезопасным?


гарантирует ли ExecutorService потокобезопасность ?

Я буду отправлять задания из разных потоков в один и тот же ThreadPoolExecutor, мне нужно синхронизировать доступ к исполнителю перед взаимодействием/отправкой задач?

6 65

6 ответов:

Это правда, классы JDK, о которых идет речь, похоже, не дают явной гарантии потокобезопасной отправки задачи. Однако на практике все реализации ExecutorService в библиотеке действительно потокобезопасны таким образом. Я думаю, что это разумно, чтобы зависеть от этого. Поскольку весь код, реализующий эти функции, был помещен в общественное достояние, нет абсолютно никакой мотивации для кого-либо полностью переписать его по-другому.

(вопреки другим ответам) контракт на потокобезопасность и документально: посмотрите в interface javadocs (в отличие от javadoc методов). Например, в нижней части ExecutorService javadoc вы найдете:

эффекты непротиворечивости памяти: действия в потоке до отправка выполняемой или вызываемой задачи в Службу ExecutorService происходят-перед любые действия этой задачи, которая, в свою очередь, происходят-перед результат извлекается через будущее.получить.)(

этого достаточно, чтобы ответить на этот:

" должен ли я синхронизировать доступ к исполнителю перед взаимодействием/отправкой задач?"

нет, вы не. Это прекрасно для создания и отправки заданий на любой (правильно реализован) ExecutorService без внешней синхронизации. Это одна из главных целей дизайна.

ExecutorService это одновременно утилита, которая должна сказать, что она предназначена для работы в наибольшей степени, не требуя синхронизации, для производительности. (Синхронизация вызывает конфликт потоков, который может ухудшить эффективность многопоточности-особенно при масштабировании до большого количества потоков.)

нет никакой гарантии о том, в какое время в будущем задачи будут выполняться или завершаться (некоторые могут даже выполняться сразу в том же потоке, который их отправил), однако рабочий поток гарантированно видел все эффекты, которые выполнял отправляющий поток до момента подачи. Поэтому (поток, который выполняется) ваша задача также может безопасно читать любые данные, созданные для его использования без синхронизации, потокобезопасных классов или любых других форм "безопасной публикации". Сам акт представления задачи достаточен для "безопасной публикации" входных данных в задачу. Вам просто нужно убедиться, что входные данные не будут изменены в в любом случае, пока задача выполняется.

аналогично, когда вы получаете результат задачи обратно через Future.get(), извлекающий поток будет гарантированно видеть все эффекты, сделанные рабочим потоком исполнителя (как в возвращенном результате, так и в любых изменениях побочных эффектов, которые мог сделать рабочий поток).

этот контракт также подразумевает, что это нормально для самих задач, чтобы представить больше задач.

" гарантирует ли ExecutorService потокобезопасность ?"

теперь эта часть вопроса гораздо больше общего. Например, не удалось найти ни одного утверждения контракта на потокобезопасность о методе shutdownAndAwaitTermination - хотя я отмечаю, что пример кода в Javadoc не использует синхронизацию. (Хотя, возможно, есть скрытое предположение, что завершение работы инициируется тем же потоком, который создал исполнитель, а не, например, рабочий поток?)

кстати я бы рекомендовал книгу " Java Concurrency In Практика " для хорошего землянина в мире параллельного программирования.

Ваш вопрос довольно открытый: все ExecutorService интерфейс делает это гарантирует, что какой-то поток где-то будет обрабатывать представленные Runnable или Callable экземпляра.

если представленные Runnable/Callable ссылается на общую структуру данных, доступную из других Runnable/Callables экземпляры (потенциально обрабатываются одновременно различными потоками), то это ваши обязанности для обеспечения потокобезопасности этих данных структура.

чтобы ответить на вторую часть вашего вопроса, да, у вас будет доступ к ThreadPoolExecutor перед отправкой любых задач; например

BlockingQueue<Runnable> workQ = new LinkedBlockingQueue<Runnable>();
ExecutorService execService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.SECONDS, workQ);
...
execService.submit(new Callable(...));

EDIT

на основе комментария Брайана и в случае, если я неправильно понял ваш вопрос: Представление задач из нескольких потоков производителя в ExecutorService обычно будет потокобезопасным (несмотря на то, что он явно не упоминается в API интерфейса, насколько я могу судить). Любой реализация, которая не предлагала потокобезопасность, была бы бесполезной в многопоточной среде (поскольку несколько производителей / несколько потребителей-довольно распространенная парадигма), и это именно то, что ExecutorService (а остальные java.util.concurrent) был предназначен для.

на ThreadPoolExecutor ответ:да. ExecutorService тут не мандат или иным образом гарантировать, что все реализации потокобезопасны, и это не может, поскольку это интерфейс. Эти типы контрактов находятся вне области действия интерфейса Java. Однако,ThreadPoolExecutor и то, и другое четко документировано как потокобезопасное. Более того,ThreadPoolExecutor управляет это очередь заданий с помощью java.util.concurrent.BlockingQueue который является интерфейсом, который запрашивает все реализации потокобезопасны. Любой java.util.concurrent.* реализация BlockingQueue можно смело считать потокобезопасным. Любая нестандартная реализация не может, хотя это было бы откровенно глупо, если бы кто-то предоставил BlockingQueue реализация очереди, которая не была потокобезопасной.

так что ответ на ваш титульный вопрос явно да. Ответ на последующий текст вашего вопроса -наверное, так как есть некоторые расхождения между ними.

вопреки тому, что ответ Люк Usherwood претензии, это не подразумевается в документации, что ExecutorService реализаций гарантированно будут ориентированы на многопотоковое исполнение. Что касается вопроса о ThreadPoolExecutor в частности, см. другие ответы.

да, a происходит-перед связь указана, но это не означает ничего о потокобезопасности самих методов, как прокомментировал км. В Люк Usherwood ' s ответ утверждается, что первое достаточно для доказательства второго, но никаких фактических аргументов не приводится.

"потокобезопасность" может означать разные вещи, но вот простой встречный пример Executor (не ExecutorService но это не имеет никакого значения), что тривиально соответствует требуемому происходит-перед отношения, но не является потокобезопасным из-за несинхронизированного доступа к

для ThreadPoolExecutor он представляет потокобезопасность. Вы можете увидеть исходный код в jdk8. При добавлении новой задачи он использует mainLock для обеспечения потокобезопасности.

private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);

                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;

                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }

            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());

                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }