Пользовательский пул потоков в параллельном потоке Java 8


можно ли указать пользовательский пул потоков для Java 8 параллельный поток? Я не могу найти его нигде.

представьте себе, что у меня есть серверное приложение, и я хотел бы использовать параллельные потоки. Но приложение является большим и многопоточным, поэтому я хочу разделить его. Я не хочу медленного выполнения задачи в одном модуле задач applicationblock из другого модуля.

Если я не могу использовать разные пулы потоков для разных модулей, это значит, я не могу безопасно использовать параллельные потоки в большинстве реальных ситуаций.

попробуйте следующий пример. Есть некоторые задачи с интенсивным ЦП, выполняемые в отдельных потоках. Задачи используют параллельные потоки. Первая задача нарушена, поэтому каждый шаг занимает 1 секунду (имитируется потоком сна). Проблема в том, что другие потоки застревают и ждут завершения сломанной задачи. Это надуманный пример, но представьте себе приложение сервлета и кого-то, отправляющего длительную задачу в общий доступ вилка присоединиться бассейн.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}
10 301

10 ответов:

на самом деле есть трюк, Как выполнить параллельную операцию в определенном пуле fork-join. Если вы выполняете его как задачу в пуле fork-join, он остается там и не использует общий.

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();

трюк основан на ForkJoinTask.вилка который указывает: "организует асинхронное выполнение этой задачи в пуле, в котором выполняется текущая задача, если это применимо, или с помощью ForkJoinPool.commonPool () если не inForkJoinPool ()"

параллельные потоки используют значение по умолчанию ForkJoinPool.commonPool, который по умолчанию имеет один меньше потоков, сколько процессоров, как возвращено Runtime.getRuntime().availableProcessors() (это означает, что параллельные потоки используют все ваши процессоры, потому что они также используют основной поток):

для приложений, которые требуют отдельных или пользовательских пулов, ForkJoinPool может быть построен с заданным целевым уровнем параллелизма; по умолчанию, равным количеству доступных процессоры.

Это также означает, что если у вас есть вложенные параллельные потоки или несколько параллельных потоков, запущенных одновременно, они все будут долю тот же бассейн. Преимущество: вы никогда не будете использовать больше, чем по умолчанию (число доступных процессоров). Недостаток: вы не можете получить "все процессоры", назначенные каждому параллельному потоку, который вы инициируете (если у вас есть более одного). (По-видимому, вы можете использовать ManagedBlocker обойти что.)

чтобы изменить способ выполнения параллельных потоков, вы можете либо

  • отправьте выполнение параллельного потока в свой собственный ForkJoinPool:yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); или
  • вы можете изменить размер общего пула, используя свойства системы: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") для целевого параллелизма 20 потоков.

пример последнего на моей машине, которая имеет 8 процессоров. Если я запускаю следующую программу:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

выход это:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

таким образом, вы можете видеть, что параллельный поток обрабатывает 8 элементов одновременно, т. е. он использует 8 потоков. Однако, если я раскомментирую комментируемую строку, вывод будет:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

на этот раз параллельный поток использовал 20 потоков и все 20 элементов в потоке были обработаны одновременно.

в качестве альтернативы трюку запуска параллельных вычислений внутри вашего собственного forkJoinPool вы также можете передать этот пул в CompletableFuture.supplyAsync метод, как в:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

использование ForkJoinPool и submit для параллельного потока не позволяет надежно использовать все потоки. Если вы посмотрите на это ( параллельный поток из HashSet не работает параллельно ) и этот ( Почему параллельный поток не использует все потоки ForkJoinPool? ), вы увидите рассуждения.

короткая версия: если ForkJoinPool / submit не работает для вас, используйте

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

до сих пор используются решения, описанные в ответах на этот вопрос. Теперь я придумал небольшую библиотеку под названием Поддержка Параллельного Потока для этого:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

но, как указал @PabloMatiasGomez в комментариях, есть недостатки в отношении механизма разделения параллельных потоков, который сильно зависит от размера общего пула. Смотрите параллельный поток из HashSet не работает параллельно .

Я использую этот решение только иметь отдельные пулы для разных типов работы, но я не могу установить размер общего пула в 1, даже если я его не использую.

для того чтобы измерить фактическое количество используемых потоков, вы можете проверить Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

Это может производить на 4-ядерном процессоре выход типа:

5 // common pool
23 // custom pool

без .parallel() это дает:

3 // common pool
4 // custom pool

перейти к get AbacusUtil. Номер потока может быть указан для параллельного потока. Вот пример кода:

LongStream.range(4, 1_000_000).parallel(threadNum)...

раскрытие информации: я разработчик AbacusUtil.

Если вы не возражаете использовать стороннюю библиотеку, с Циклоп-реагировать вы можете смешивать последовательные и параллельные потоки в одном конвейере и предоставлять пользовательские ForkJoinPools. Например

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

или если мы хотим продолжить обработку в последовательном потоке

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[раскрытие я ведущий разработчик cyclops-react]

Я пробовал custom ForkJoinPool следующим образом, чтобы настроить размер пула:

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
    List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> {
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            })
            .reduce(0L, Long::sum);
}

private static void testForkJoinPool() {
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        }
    }
    out.println(result);
    out.println(ThreadNameSet);
}

вот вывод, говорящий, что пул использует больше потоков, чем по умолчанию 4.

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

но на самом деле есть извращенец, когда я пытался достичь того же результата, используя ThreadPoolExecutor следующим образом:

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

но я потерпел неудачу.

он будет только начать parallelStream в новом потоке, а затем все остальное-то же самое, что снова подтверждает, что parallelStream использовать ForkJoinPool для запуска дочерних потоков.

Примечание: Похоже, что в JDK 10 реализовано исправление, которое гарантирует, что пользовательский пул потоков использует ожидаемое количество потоков.

параллельное выполнение потока в пользовательском ForkJoinPool должно подчиняться параллелизму https://bugs.openjdk.java.net/browse/JDK-8190974