Как структура fork / join лучше, чем пул потоков?


каковы преимущества использования нового fork / join framework over просто разбивает большую задачу на N подзадач в начале, отправляя их в кэшированный пул потоков (от исполнители) и ждать завершения каждой задачи? Я не вижу, как использование абстракции fork/join упрощает проблему или делает решение более эффективным из того, что у нас было в течение многих лет.

например, распараллеленный алгоритм размытия в пример может быть реализовано следующим образом:

public class Blur implements Runnable {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;

    private int mBlurWidth = 15; // Processing window size, should be odd.

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    public void run() {
        computeDirectly();
    }

    protected void computeDirectly() {
        // As in the example, omitted for brevity
    }
}

разделить в начале и отправить задачи в пул потоков:

// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool

int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();

// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
    int size = Math.min(maxSize, src.length - i);
    ForkBlur task = new ForkBlur(src, i, size, dst);
    Future f = threadPool.submit(task);
    futures.add(f);
}

// Wait for all sent tasks to complete:
for (Future future : futures) {
    future.get();
}

// Done!

задачи переходят в очередь пула потоков, из которой они выполняются как рабочие потоки становятся доступными. Пока разделение достаточно гранулировано (чтобы избежать необходимости особенно ждать последней задачи), а пул потоков имеет достаточно (по крайней мере, N процессоров) потоков, все процессоры работают на полной скорости до тех пор, пока все вычисления выполнены.

Я что-то пропустила? Какова дополнительная ценность использования фреймворка fork/join?

10 106

10 ответов:

Я думаю, что основное недоразумение заключается в том, что примеры Fork/Join делают не показать работу воровство но только какой-то стандарт разделяй и властвуй.

кража работы будет выглядеть так: рабочий B закончил свою работу. Он добрый, поэтому он оглядывается и видит, что рабочий а все еще работает очень тяжело. Он подходит и спрашивает: "Эй, парень, я мог бы тебе помочь." Отклик. "Круто, у меня есть эта задача 1000 единиц. До сих пор я закончил 345 оставляя 655. Не могли бы вы поработать над номером 673 до 1000, я сделаю 346 до 672."B говорит:" Хорошо, давайте начнем, чтобы мы могли пойти в паб раньше."

вы видите-рабочие должны общаться между собой, даже когда они начали реальную работу. Это недостающая часть в примерах.

примеры с другой стороны показывают только что-то вроде "использовать субподрядчиков":

работник а: "Данг, я имею 1000 блоков работы. Слишком много для меня. Я сделаю 500 сам и субподряд 500 кому-то другому."Это продолжается до тех пор, пока большая задача не будет разбита на небольшие пакеты по 10 единиц каждый. Они будут выполнены доступными рабочими. Но если один пакет является своего рода ядовитой пилюлей и занимает значительно больше времени, чем другие пакеты-неудача, фаза разделения закончена.

единственное оставшееся различие между Fork / Join и разделением задачи авансом заключается в следующем: при разделении авансом у вас есть полная очередь работы с самого начала. Пример: 1000 единиц, порог 10, так что очередь имеет 100 записей. Эти пакеты распределяются между членами threadpool.

Fork / Join является более сложным и пытается сохранить количество пакетов в очереди меньше:

  • Шаг 1: Положите один пакет, содержащий (1...1000) в очереди
  • Шаг 2: один рабочий открывает пакет(1...1000) и заменяет его двумя пакетами: (1...500) и (501...1000).
  • Шаг 3: один рабочий всплывает пакет (500...1000) и толкает (500...750) и (751...1000).
  • шаг n: стек содержит следующие пакеты: (1..500), (500...750), (750...875)... (991..1000)
  • шаг n+1: пакет (991..1000) выскакивает и выполняется
  • шаг n+2: пакет (981..990) выскакивает и выполняется
  • шаг n+3: пакет (961..980) выскочил и разделился на (961...970) и (971..980). ....

вы видите: в Fork / Join очередь меньше (6 в Примере), а фазы" Сплит "и" работа чередованной.

когда несколько работников появляются и толкают одновременно взаимодействия не так ясно, конечно.

Если у вас есть N занятых потоков, которые все работают на 100% независимо, это будет лучше, чем N потоков в пуле Fork-Join (FJ). Но так никогда не бывает.

там не может быть в состоянии точно разделить проблему на n равных частей. Даже если вы это сделаете, планирование потоков-это какой-то способ быть справедливым. Вы будете в конечном итоге ждать самого медленного потока. Если у вас есть несколько задач, то каждый из них может работать с менее чем n-образным параллелизмом( как правило, более эффективным), но идти вверх к n-способу, когда другие задачи закончились.

Так почему бы нам просто не разрезать проблему на части FJ-размера и не создать пул потоков для этого. Типичное использование FJ разрезает проблему на мелкие кусочки. Выполнение их в случайном порядке требует большой координации на аппаратном уровне. Накладные расходы будут убийственными. В FJ задачи помещаются в очередь, которую поток считывает в последнем порядке в первом порядке (LIFO / stack), а работа воровства (в основной работе, как правило) выполняется сначала в первом порядке (FIFO / "очередь"). В результате обработка длинного массива может выполняться в основном последовательно, даже если он разбит на крошечные куски. (Это также тот случай, когда может быть нетривиально разбить проблему на небольшие куски одинакового размера в одном большом взрыве. Скажем, имея дело с некоторой формой иерархии без балансировки.)

вывод: FJ позволяет более эффективно использовать аппаратные потоки в неравномерных ситуациях, что будет всегда, если у вас есть более одного потока.

Fork / join отличается от пула потоков, потому что он реализует кражу работы. От Fork / Join

как и в любом ExecutorService, платформа fork/join распределяет задачи для рабочих потоков в пуле потоков. Структура fork / join является отличается, потому что он использует алгоритм переноса. Рабочий поток то, что заканчивается, может украсть задачи из других потоков, которые все еще заняты.

скажем, у вас есть два потока, и 4 задачи a, b, c, d, которые занимают 1, 1, 5 и 6 секунд соответственно. Первоначально A и b назначаются потоку 1, А c и d-потоку 2. В пуле потоков это займет 11 секунд. С помощью fork / join поток 1 завершается и может украсть работу из потока 2, поэтому задача d будет выполняться потоком 1. Поток 1 выполняет a, b и d, поток 2 только c. Общее время: 8 секунд, а не 11.

EDIT: как указывает Джунас, задачи не обязательно предварительно выделяются потоку. Идея fork / join - это то, что поток может разделить задачу на несколько частей. Итак, чтобы повторить вышесказанное:

У нас есть две задачи (ab) и (cd), которые занимают 2 и 11 секунд соответственно. Поток 1 начинает выполнять ab и разбивает его на две подзадачи a & b. аналогично с потоком 2 он разбивается на две подзадачи c & d. когда поток 1 закончил a & b, он может украсть d из потока 2.

все выше правильно преимущества достигаются путем кражи работы, но чтобы расширить, почему это так.

главным преимуществом является эффективная координация между рабочими потоками. Работа должна быть разделена и собрана заново, что требует координации. Как вы можете видеть в ответе A. H над каждым потоком есть свой собственный список работ. Важным свойством этого списка является то, что он отсортирован (большие задачи вверху и маленькие задачи внизу). Каждый поток выполняет задачи в нижней части его списка и крадет задачи из верхней части других списков потоков.

в результате:

  • голова и хвост списков задач могут синхронизироваться независимо, уменьшая конкуренцию в списке.
  • значительные поддеревья работы разделяются и собираются одним потоком, поэтому для этих поддеревьев не требуется координация между потоками.
  • когда нить крадет работу, она берет большой кусок, который затем подразделяет на свой собственный список
  • работа steeling означает, что нити почти полностью используются до конца процесса.

большинство других схем разделения и завоевания с использованием пулов потоков требуют большей межпотоковой связи и координации.

в этом примере Fork / Join не добавляет никакого значения, потому что разветвление не требуется, и рабочая нагрузка равномерно распределяется по рабочим потокам. Fork / Join только добавляет накладные расходы.

здесь хорошая статья на эту тему. Цитата:

в целом, мы можем сказать, что ThreadPoolExecutor является предпочтительным где рабочая нагрузка равномерно распределена по рабочим потокам. Иметь возможность чтобы гарантировать это, вам нужно точно знать, какие входные данные похоже. От напротив, ForkJoinPool обеспечивает хорошую производительность независимо от входных данных и, таким образом, является значительно более надежным решение.

конечная цель пулов потоков и Fork / Join одинаковы: оба хотят использовать доступную мощность процессора как можно лучше для максимальной пропускной способности. Максимальная пропускная способность означает, что как можно больше задач должно быть выполнено в течение длительного периода времени. Что для этого нужно сделать? (Для следующего предположим, что нет недостатка в расчетных задачах: всегда достаточно сделать для 100% использования процессора. Кроме того, я использую" CPU " эквивалентно для ядер или виртуальных ядер в случае гипер-продевать нитку).

  1. по крайней мере, должно быть столько потоков, сколько доступно процессоров, потому что запуск меньшего количества потоков оставит ядро неиспользуемым.
  2. в максимуме должно быть столько потоков, сколько доступно процессоров, потому что запуск большего количества потоков создаст дополнительную нагрузку для планировщика, который назначает процессоры различным потокам, что приводит к тому, что некоторое время процессора переходит к планировщику, а не к нашим вычислительным ресурсам задача.

таким образом, мы выяснили, что для максимальной пропускной способности нам нужно иметь точно такое же количество потоков, чем процессоры. В Примере размытия Oracle вы можете взять пул потоков фиксированного размера с количеством потоков, равным количеству доступных процессоров, или использовать пул потоков. Это не будет иметь значения, вы правы!

Итак, когда вы попадете в беду с пулами потоков? То есть если поток блокирует, потому что ваш поток ждет еще одна задача для завершения. Рассмотрим следующий пример:

class AbcAlgorithm implements Runnable {
    public void run() {
        Future<StepAResult> aFuture = threadPool.submit(new ATask());
        StepBResult bResult = stepB();
        StepAResult aResult = aFuture.get();
        stepC(aResult, bResult);
    }
}

здесь мы видим алгоритм, который состоит из трех шагов A, B и C. A и B могут выполняться независимо друг от друга, но шаг C требует результата шага A и B. Что делает этот алгоритм, так это отправляет задачу A в пул потоков и выполняет задачу b напрямую. После этого поток будет ждать выполнения задачи A и продолжит выполнение шага C. Если A и B будут завершены одновременно, то все будет выполнено штраф. Но что, если A занимает больше времени, чем B? Это может быть потому, что природа задачи A диктует это, но это также может быть так, потому что нет поток для задачи A доступен в начале, и задача A должна ждать. (Если есть только один доступный процессор, и, следовательно, ваш threadpool имеет только один поток, это даже вызовет тупик, но на данный момент это помимо точки). Дело в том, что поток, который только что выполнил задачу B блокирует весь поток. Так как у нас есть такое же количество потоков, как процессоры и один поток блокируется, что означает, что один процессор находится в режиме ожидания.

Fork / Join решает эту проблему: в структуре fork / join вы бы написали тот же алгоритм следующим образом:

class AbcAlgorithm implements Runnable {
    public void run() {
        ATask aTask = new ATask());
        aTask.fork();
        StepBResult bResult = stepB();
        StepAResult aResult = aTask.join();
        stepC(aResult, bResult);
    }
}

выглядит так же, не так ли? Однако ключ в том, что aTask.joinне будет блока. Вместо этого вот где переноса вступает в игру: поток будет искать другие задачи, которые были разветвлены в прошлое и будет продолжаться с тех пор. Сначала он проверяет, начали ли обрабатываться задачи, которые он разветвил сам. Поэтому, если A еще не был запущен другим потоком, он сделает следующий, иначе он проверит очередь других потоков и украдет их работу. Как только эта другая задача другого потока завершится, он проверит, завершен ли A сейчас. Если это выше алгоритм может вызвать stepC. Иначе он будет искать еще одну задачу, чтобы украсть. Таким образом разделение/объединение бассейнов может достигнуть 100% загрузка процессора, даже перед лицом блокировки действий.

однако есть ловушка: работа-кража возможна только для join вызов ForkJoinTasks. это не может быть сделано для внешних действий блокировки, таких как ожидание другого потока или ожидание действия ввода-вывода. Так что насчет этого, ожидание ввода-вывода для завершения является общей задачей? В этом случае, если бы мы могли добавить дополнительный поток в пул Fork / Join, который будет остановлен снова, как только действие блокировки завершится будет вторая лучшая вещь, чтобы сделать. А то ForkJoinPool может на самом деле сделать именно это, если мы используем ManagedBlocker s.

Фибоначчи

на JavaDoc для RecursiveTask является примером для вычисления чисел Фибоначчи с помощью Fork / Join. Для классического рекурсивного решения см.:

public static int fib(int n) {
    if (n <= 1) {
        return n;
    }
    return fib(n - 1) + fib(n - 2);
}

как объясняется в JavaDocs это довольно дамп способ вычисления чисел Фибоначчи, так как этот алгоритм имеет O(2^n) сложность в то время как более простые способы вероятный. Однако этот алгоритм очень прост и понятен, поэтому мы придерживаемся его. Предположим, что мы хотим ускорить это с помощью Fork/Join. Наивная реализация будет выглядеть так:

class Fibonacci extends RecursiveTask<Long> {
    private final long n;

    Fibonacci(long n) {
        this.n = n;
    }

    public Long compute() {
        if (n <= 1) {
            return n;
        }
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        return f2.compute() + f1.join();
   }
}

шаги, на которые разделена эта задача, слишком коротки, и поэтому это будет ужасно работать, но вы можете видеть, как структура обычно работает очень хорошо: два слагаемых могут быть вычислены независимо, но тогда нам нужны оба из них для построения конечного результата. Так что одна половина сделано в другом потоке. Получайте удовольствие, делая то же самое с пулами потоков, не получая тупик (возможно, но не так просто).

просто для полноты: если вы действительно хотите вычислить числа Фибоначчи, используя этот рекурсивный подход, вот оптимизированная версия:

class FibonacciBigSubtasks extends RecursiveTask<Long> {
    private final long n;

    FibonacciBigSubtasks(long n) {
        this.n = n;
    }

    public Long compute() {
        return fib(n);
    }

    private long fib(long n) {
        if (n <= 1) {
            return 1;
        }
        if (n > 10 && getSurplusQueuedTaskCount() < 2) {
            final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
            final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
            f1.fork();
            return f2.compute() + f1.join();
        } else {
            return fib(n - 1) + fib(n - 2);
        }
    }
}

это сохраняет подзадачи намного меньше, потому что они разделяются только тогда, когда n > 10 && getSurplusQueuedTaskCount() < 2 верно, что означает, что существует значительно более 100 вызовов методов (n > 10) и есть не очень человек задачи уже ждут (getSurplusQueuedTaskCount() < 2).

на моем компьютере (4 core (8 при подсчете Hyper-threading), Intel (R) Core (TM) i7-2720QM CPU @ 2.20 GHz)fib(50) занимает 64 секунды с классическим подходом и всего 18 секунд с подходом Fork/Join, который является довольно заметным выигрышем, хотя и не так много, как теоретически возможно.

резюме

  • да, в вашем примере Fork / Join не имеет преимущества перед классическим потоком бассейны.
  • Fork / Join может значительно повысить производительность при блокировке
  • Fork / Join обходит некоторые тупиковые проблемы

еще одно важное отличие заключается в том, что с помощью F-J вы можете выполнять несколько сложных фаз "соединения". Рассмотрим сортировку слиянием изhttp://faculty.ycp.edu / ~dhovemey/spring2011/cs365/lecture/lecture18.html, было бы слишком много оркестровки требуется, чтобы предварительно разделить эту работу. например, вам нужно сделать следующие вещи:

  • сортировать первый квартал
  • сортировать второй квартал
  • объединить первые 2 квартала
  • сортировка третья четверть
  • сортировать четвертый квартал
  • объединить последние 2 квартала
  • объединить 2 половинки

Как вы указываете, что вы должны сделать сортировку перед слияниями, которые касаются их и т. д.

Я смотрел, как лучше всего сделать определенную вещь для каждого из списка элементов. Я думаю, что я просто предварительно разделю список и использую стандартный пул потоков. F-J кажется наиболее полезным, когда работа не может быть предварительно разделена на достаточно независимые задачи, но могут быть рекурсивно разделены на задачи, которые независимы между собой (например, сортировка половин независима, но слияние 2 отсортированных половин в отсортированное целое не является).

F / J также имеет явное преимущество, когда у вас есть дорогие операции слияния. Поскольку он разбивается на древовидную структуру, вы выполняете только слияние log2(n), а не N слияний с линейным разделением потоков. (Это делает теоретическое предположение, что у вас столько же процессоров, сколько потоков, но все же преимущество) для домашнего задания нам пришлось объединить несколько тысяч 2D-массивов (все те же размеры), суммируя значения по каждому индексу. С помощью fork join и P процессоров время приближается к log2 (n) как P приближается к бесконечности.

1 2 3 .. 7 3 1 .... 8 5 4
4 5 6 + 2 4 3 => 6 9 9
7 8 9 .. 1 1 0 .... 8 9 9

Если проблема такова, что мы должны ждать завершения других потоков(как в случае сортировки массива или суммы массива), следует использовать fork join, как Executor(исполнители.newFixedThreadPool (2)) будет задыхаться из-за ограниченного количества потоков. Пул forkjoin создаст больше потоков в этом случае, чтобы скрыть заблокированный поток для поддержания того же параллелизма

источник: http://www.oracle.com/technetwork/articles/java/fork-join-422606.html

проблема с исполнителями для реализации алгоритмов divide и conquer не связана с созданием подзадач, потому что вызываемый объект может свободно отправлять новую подзадачу своему исполнителю и ждать ее результата синхронно или асинхронно. Проблема заключается в параллелизме: когда вызываемый объект ожидает результата другого вызываемого объекта, он переводится в состояние ожидания, что приводит к потере возможность обработки другой вызываемой очереди для выполнения.

платформа fork/join добавлена в java.утиль.параллельный пакет в Java SE 7 благодаря усилиям Дуга Леа заполняет этот пробел

источник: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

пул пытается поддерживать достаточно активные (или доступные) потоки путем динамического добавления, приостановки или возобновления внутренних рабочих потоков, даже если некоторые задачи застопорились в ожидании присоединения к другим. Однако такие настройки не гарантируются перед лицом заблокированного ввода-вывода или другой неуправляемой синхронизации

public int getPoolSize() Возвращает количество рабочих потоков, которые были запущены, но еще не завершены. результат, возвращаемый этим методом, может отличаться от getParallelism (), когда потоки создаются для поддержания параллелизма, когда другие совместно блокируются.

Вы были бы поражены производительностью ForkJoin в приложении, таком как crawler. вот это самое лучшее учебник вы бы поучились.

логика Fork/Join очень проста: (1) отдельная (fork) каждая большая задача на более мелкие задачи; (2) обрабатывать каждую задачу в отдельном потоке (разделяя те на еще более мелкие задачи, если это необходимо); (3) присоединяйтесь результаты.