Простой список.parallelStream () в java 8 stream, похоже, не работает?


Из этого вопроса " будут ли внутренние параллельные потоки обрабатываться полностью параллельно, прежде чем рассматривать возможность распараллеливания внешнего потока?", я понял, что потоки выполняют работу-воруют. Однако я заметил, что это часто не происходит. Например, если у меня есть список, скажем, 100 000 элементов, и я пытаюсь обработать его в parallelStream (), я часто замечаю в конце, что большинство ядер моего процессора простаивают в состоянии "ожидания". (Примечание: из 100 000 элементов в список, некоторые элементы занимают много времени для обработки, в то время как другие являются быстрыми; и, список не сбалансирован, поэтому некоторые потоки могут получить "невезучие" и иметь много дел, в то время как другие получают удачу и имеют мало дел).

Итак, моя теория заключается в том, что JIT-компилятор делает начальное разделение 100 000 элементов на 16 потоков (потому что у меня есть 16 ядер), но затем в каждом потоке он просто делает простой (последовательный) цикл for (поскольку это было бы наиболее эффективным) и поэтому не работает воровство никогда не произойдет (что я и вижу).

Я думаю, что причина, по которой внутренние параллельные потоки будут обрабатываться полностью параллельно, прежде чем рассматривать распараллеливание внешнего потока? показал, что воровство работы заключается в том, что был внешний цикл, который передавал потоки внутренний цикл, который передавал поток, и поэтому в этом случае каждый внутренний цикл оценивался во время выполнения и создавал новые задачи, которые во время выполнения могли быть назначены "простаивающим" потокам. Мысли? Есть что-то я делаю не так, что бы "заставить" простой список.parallelStream() использовать для кражи работы? (Мой текущий обходной путь - попытаться сбалансировать список на основе различных эвристик, чтобы каждый поток видел, как правило, одинаковый объем работы;но это трудно предсказать....)

1 3

1 ответ:

Это не имеет ничего общего с JIT-компилятором, но с реализацией Stream API. Он разделит рабочую нагрузку на блоки, которые обрабатываются последовательно рабочими потоками. Общая стратегия состоит в том, чтобы иметь больше рабочих мест, чем рабочих потоков, чтобы обеспечить возможность кражи рабочих мест, см. ForkJoinTask.getSurplusQueuedTaskCount(), что может быть использовано для реализации такой адаптивной стратегии.

Следующий код можно использовать для определения того, сколько элементов было обработано последовательно, когда источником является ArrayList:

List<Object> list = new ArrayList<>(Collections.nCopies(10_000, ""));
System.out.println(System.getProperty("java.version"));
System.out.println(Runtime.getRuntime().availableProcessors());
System.out.println( list.parallelStream()
    .collect(
        () -> new ArrayList<>(Collections.singleton(0)),
        (l,x) -> l.replaceAll(i -> i + 1),
        List::addAll) );

На моей текущей тестовой машине он печатает:

1.8.0_60
4
[625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625]
Таким образом, существует больше блоков, чем ядер, чтобы позволить кражу работы. Однако после того, как последовательная обработка фрагмента началась, он не может быть разделен дальше, поэтому эта реализация имеет ограничения, когда время выполнения каждого элемента значительно отличается. Это всегда компромисс.