Пользовательский пул потоков в параллельном потоке Java 8
можно ли указать пользовательский пул потоков для Java 8 параллельный поток? Я не могу найти его нигде.
представьте себе, что у меня есть серверное приложение, и я хотел бы использовать параллельные потоки. Но приложение является большим и многопоточным, поэтому я хочу разделить его. Я не хочу медленного выполнения задачи в одном модуле задач applicationblock из другого модуля.
Если я не могу использовать разные пулы потоков для разных модулей, это значит, я не могу безопасно использовать параллельные потоки в большинстве реальных ситуаций.
попробуйте следующий пример. Есть некоторые задачи с интенсивным ЦП, выполняемые в отдельных потоках. Задачи используют параллельные потоки. Первая задача нарушена, поэтому каждый шаг занимает 1 секунду (имитируется потоком сна). Проблема в том, что другие потоки застревают и ждут завершения сломанной задачи. Это надуманный пример, но представьте себе приложение сервлета и кого-то, отправляющего длительную задачу в общий доступ вилка присоединиться бассейн.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
10 ответов:
на самом деле есть трюк, Как выполнить параллельную операцию в определенном пуле fork-join. Если вы выполняете его как задачу в пуле fork-join, он остается там и не использует общий.
ForkJoinPool forkJoinPool = new ForkJoinPool(2); forkJoinPool.submit(() -> //parallel task here, for example IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()) ).get();
трюк основан на ForkJoinTask.вилка который указывает: "организует асинхронное выполнение этой задачи в пуле, в котором выполняется текущая задача, если это применимо, или с помощью ForkJoinPool.commonPool () если не inForkJoinPool ()"
параллельные потоки используют значение по умолчанию
ForkJoinPool.commonPool
, который по умолчанию имеет один меньше потоков, сколько процессоров, как возвращеноRuntime.getRuntime().availableProcessors()
(это означает, что параллельные потоки используют все ваши процессоры, потому что они также используют основной поток):для приложений, которые требуют отдельных или пользовательских пулов, ForkJoinPool может быть построен с заданным целевым уровнем параллелизма; по умолчанию, равным количеству доступных процессоры.
Это также означает, что если у вас есть вложенные параллельные потоки или несколько параллельных потоков, запущенных одновременно, они все будут долю тот же бассейн. Преимущество: вы никогда не будете использовать больше, чем по умолчанию (число доступных процессоров). Недостаток: вы не можете получить "все процессоры", назначенные каждому параллельному потоку, который вы инициируете (если у вас есть более одного). (По-видимому, вы можете использовать ManagedBlocker обойти что.)
чтобы изменить способ выполнения параллельных потоков, вы можете либо
- отправьте выполнение параллельного потока в свой собственный ForkJoinPool:
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
или- вы можете изменить размер общего пула, используя свойства системы:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
для целевого параллелизма 20 потоков.
пример последнего на моей машине, которая имеет 8 процессоров. Если я запускаю следующую программу:
long start = System.currentTimeMillis(); IntStream s = IntStream.range(0, 20); //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); s.parallel().forEach(i -> { try { Thread.sleep(100); } catch (Exception ignore) {} System.out.print((System.currentTimeMillis() - start) + " "); });
выход это:
215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416
таким образом, вы можете видеть, что параллельный поток обрабатывает 8 элементов одновременно, т. е. он использует 8 потоков. Однако, если я раскомментирую комментируемую строку, вывод будет:
215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216
на этот раз параллельный поток использовал 20 потоков и все 20 элементов в потоке были обработаны одновременно.
в качестве альтернативы трюку запуска параллельных вычислений внутри вашего собственного forkJoinPool вы также можете передать этот пул в CompletableFuture.supplyAsync метод, как в:
ForkJoinPool forkJoinPool = new ForkJoinPool(2); CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() -> //parallel task here, for example range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), forkJoinPool );
использование ForkJoinPool и submit для параллельного потока не позволяет надежно использовать все потоки. Если вы посмотрите на это ( параллельный поток из HashSet не работает параллельно ) и этот ( Почему параллельный поток не использует все потоки ForkJoinPool? ), вы увидите рассуждения.
короткая версия: если ForkJoinPool / submit не работает для вас, используйте
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");
до сих пор используются решения, описанные в ответах на этот вопрос. Теперь я придумал небольшую библиотеку под названием Поддержка Параллельного Потока для этого:
ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS); ParallelIntStreamSupport.range(1, 1_000_000, pool) .filter(PrimesPrint::isPrime) .collect(toList())
но, как указал @PabloMatiasGomez в комментариях, есть недостатки в отношении механизма разделения параллельных потоков, который сильно зависит от размера общего пула. Смотрите параллельный поток из HashSet не работает параллельно .
Я использую этот решение только иметь отдельные пулы для разных типов работы, но я не могу установить размер общего пула в 1, даже если я его не использую.
для того чтобы измерить фактическое количество используемых потоков, вы можете проверить
Thread.activeCount()
:Runnable r = () -> IntStream .range(-42, +42) .parallel() .map(i -> Thread.activeCount()) .max() .ifPresent(System.out::println); ForkJoinPool.commonPool().submit(r).join(); new ForkJoinPool(42).submit(r).join();
Это может производить на 4-ядерном процессоре выход типа:
5 // common pool 23 // custom pool
без
.parallel()
это дает:3 // common pool 4 // custom pool
перейти к get AbacusUtil. Номер потока может быть указан для параллельного потока. Вот пример кода:
LongStream.range(4, 1_000_000).parallel(threadNum)...
раскрытие информации: я разработчик AbacusUtil.
Если вы не возражаете использовать стороннюю библиотеку, с Циклоп-реагировать вы можете смешивать последовательные и параллельные потоки в одном конвейере и предоставлять пользовательские ForkJoinPools. Например
ReactiveSeq.range(1, 1_000_000) .foldParallel(new ForkJoinPool(10), s->s.filter(i->true) .peek(i->System.out.println("Thread " + Thread.currentThread().getId())) .max(Comparator.naturalOrder()));
или если мы хотим продолжить обработку в последовательном потоке
ReactiveSeq.range(1, 1_000_000) .parallel(new ForkJoinPool(10), s->s.filter(i->true) .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))) .map(this::processSequentially) .forEach(System.out::println);
[раскрытие я ведущий разработчик cyclops-react]
Я пробовал custom ForkJoinPool следующим образом, чтобы настроить размер пула:
private static Set<String> ThreadNameSet = new HashSet<>(); private static Callable<Long> getSum() { List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList()); return () -> aList.parallelStream() .peek((i) -> { String threadName = Thread.currentThread().getName(); ThreadNameSet.add(threadName); }) .reduce(0L, Long::sum); } private static void testForkJoinPool() { final int parallelism = 10; ForkJoinPool forkJoinPool = null; Long result = 0L; try { forkJoinPool = new ForkJoinPool(parallelism); result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { if (forkJoinPool != null) { forkJoinPool.shutdown(); //always remember to shutdown the pool } } out.println(result); out.println(ThreadNameSet); }
вот вывод, говорящий, что пул использует больше потоков, чем по умолчанию 4.
50000005000000 [ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]
но на самом деле есть извращенец, когда я пытался достичь того же результата, используя
ThreadPoolExecutor
следующим образом:BlockingDeque blockingDeque = new LinkedBlockingDeque(1000); ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));
но я потерпел неудачу.
он будет только начать parallelStream в новом потоке, а затем все остальное-то же самое, что снова подтверждает, что
parallelStream
использовать ForkJoinPool для запуска дочерних потоков.
Примечание: Похоже, что в JDK 10 реализовано исправление, которое гарантирует, что пользовательский пул потоков использует ожидаемое количество потоков.
параллельное выполнение потока в пользовательском ForkJoinPool должно подчиняться параллелизму https://bugs.openjdk.java.net/browse/JDK-8190974