Как работает шаблон разрушителя LMAX?


Я пытаюсь понять disruptor pattern. Я посмотрел видео InfoQ и попытался прочитать их статью. Я понимаю, что есть кольцевой буфер, который инициализируется как чрезвычайно большой массив, чтобы воспользоваться локальностью кэша, исключить выделение новой памяти.

похоже, что есть одно или несколько атомарных целых чисел, которые отслеживают позиции. Каждое "событие", похоже, получает уникальный идентификатор, и его позиция в кольце найдена путем поиска свой модуль по отношению к размеру кольца, etc., прием.

к сожалению, у меня нет интуитивного чувства, как это работает. Я сделал много торговых приложений и изучил актер модель, посмотрел на седа и т. д.

в своей презентации они упомянули, что этот шаблон в основном работает как маршрутизаторы; однако я не нашел хороших описаний того, как работают маршрутизаторы.

есть ли хорошие указатели на лучшее объяснение?

5 196

5 ответов:

проект кода Google делает ссылка на технический документ на реализации кольцевого буфера, однако это немного сухой, академический и жесткий собирается для тех, кто хочет узнать, как это работает. Однако есть некоторые сообщения в блоге, которые начали объяснять внутренности более читаемым способом. Существует объяснение кольцевого буфера, что является основой рисунка деструктор,описание потребительских барьеров (часть, связанная с чтение из Разрушителя) и некоторые сведения об обработке нескольких производителей доступен.

самое простое описание Разрушителя: это способ отправки сообщений между потоками наиболее эффективным способом. Он может быть использован в качестве альтернативы очереди, но он также разделяет ряд функций с SEDA и актерами.

по сравнению с очередями:

Разрушитель обеспечивает возможность передачи сообщения на другие потоки, пробуждая его при необходимости (аналогично BlockingQueue). Тем не менее, есть 3 отличия.

  1. пользователь Disruptor определяет, как хранятся сообщения, расширяя класс ввода и предоставляя фабрику для предварительного выделения. Это позволяет либо повторно использовать память (копирование), либо запись может содержать ссылку на другой объект.
  2. ввод сообщений в Разрушитель является 2-фазным процессом, сначала в кольце заявляется слот буфер, который предоставляет пользователю запись, которая может быть заполнена соответствующими данными. Затем запись должна быть зафиксирована, этот двухфазный подход необходим для обеспечения гибкого использования памяти, упомянутой выше. Именно фиксация делает сообщение видимым для потоков-потребителей.
  3. это ответственность потребителя, чтобы отслеживать сообщения, которые были потреблены из кольцевого буфера. Перемещение этой ответственности от самого кольцевого буфера помогло уменьшите количество конфликтов записи, так как каждый поток поддерживает свой собственный счетчик.

по сравнению с субъектами

модель актора ближе к разрушителю, чем большинство других моделей программирования, особенно если вы используете классы BatchConsumer/BatchHandler, которые предоставляются. Эти классы скрывают все сложности поддержания потребляемых порядковых номеров и обеспечивают набор простых обратных вызовов при возникновении важных событий. Тем не менее, есть пара тонких отличий.

  1. Разрушитель использует модель потребителя 1 thread-1, где акторы используют модель N:M, т. е. вы можете иметь столько акторов, сколько хотите, и они будут распределены по фиксированному количеству потоков (обычно 1 на ядро).
  2. интерфейс BatchHandler обеспечивает дополнительный (и очень важный) обратный вызов onEndOfBatch(). Это позволяет медленным потребителям, например тем, кто выполняет ввод-вывод для пакетной обработки событий вместе для повышения пропускной способности. Есть возможность однако, поскольку почти все другие фреймворки не предоставляют обратный вызов в конце пакета, вам нужно использовать тайм-аут для определения конца пакета, что приводит к плохой задержке.

по сравнению с седой

LMAX построил шаблон Disruptor для замены подхода на основе SEDA.

  1. главным улучшением, которое он обеспечил по сравнению с SEDA, была возможность выполнять работу параллельно. Для этого Disruptor поддерживает многократное приведение одних и тех же сообщений (в том же порядке) к нескольким потребителям. Это во избежание потребность для этапов вилки в трубопроводе.
  2. мы также позволяем потребителям ждать результатов других потребителей без необходимости ставить между ними еще один этап очереди. Потребитель может просто наблюдать порядковый номер потребителя, от которого он зависит. Это позволяет избежать необходимости соединения этапов в конвейере.

по сравнению с памятью Барьеры

другой способ думать об этом-как о структурированном, упорядоченном барьере памяти. Где барьер производителя формирует барьер записи, а барьер потребителя является барьером чтения.

Сначала мы хотели бы понять модель программирования, которую он предлагает.

есть один или несколько авторов. Есть один или несколько читателей. Существует строка записей, полностью упорядоченная от старого к новому (на фото слева направо). Авторы могут добавлять новые записи на правом конце. Каждый читатель читает записи последовательно слева направо. Читатели не могут читать писателей прошлого, очевидно.

нет понятия удаления записи. Я использую "читатель" вместо "потребитель", чтобы избежать изображение потребляемых записей. Однако мы понимаем, что записи слева от последнего читателя становятся бесполезными.

как правило, читатели могут читать одновременно и независимо. Однако мы можем объявить зависимости среди читателей. Зависимости считывателя могут быть произвольными ациклическими графами. Если читатель B зависит от читателя A, читатель B не может читать мимо читателя A.

читателя возникает зависимость, потому что читатель может комментировать записи и чтения в зависимости от B аннотацию. Например, A выполняет некоторые вычисления для записи и сохраняет результат в поле a в записи. А затем двигаться дальше, и теперь B может прочитать запись, и значение a хранимая. Если считыватель C не зависит от A, C не должен пытаться читать a.

Это действительно интересная модель программирования. Независимо от производительности, одна только модель может принести пользу множеству приложений.

конечно, главной целью LMAX является производительность. Он использует предварительно выделенное кольцо записей. Кольцо достаточно большое, но оно ограничено так, что система не будет загружена за пределы проектной мощности. Если кольцо заполнено, писатель(ы) будет ждать, пока самые медленные читатели не продвинутся и не освободят место.

объекты ввода предварительно выделены и живут вечно, чтобы уменьшить стоимость сбора мусора. Мы не вставляем новые объекты записи или удаляем старые объекты записи, вместо этого писатель запрашивает уже существующую запись, заполняет ее поля и уведомляет читателей. Это очевидно, 2-этап действий-это действительно просто атомное действие

setNewEntry(EntryPopulator);

interface EntryPopulator{ void populate(Entry existingEntry); }

предварительное выделение записей также означает, что соседние записи (очень вероятно) находятся в соседних ячейках памяти, и поскольку читатели читают записи последовательно, это важно для использования кэшей ЦП.

и много усилий, чтобы избежать блокировки, CAS, даже барьер памяти (например, использовать энергонезависимую переменную последовательности, если есть только один писатель)

для разработчиков читателей: разные аннотирующие читатели должны писать в разные поля, чтобы избежать записи раздор. (На самом деле они должны писать в разные строки кэша.) Аннотирующий читатель не должен касаться ничего, что могут прочитать другие независимые читатели. Вот почему я говорю этим читателям аннотации записи, вместо изменить записи.

Мартин Фаулер написал статью о LMAX и модели разрушителя, архитектура LMAX, что может прояснить его дальше.

Я на самом деле взял время, чтобы изучить фактический источник, из чистого любопытства, и идея за ним довольно проста. Самая последняя версия на момент написания этого поста 3.2.1.

существует буфер, хранящий предварительно выделенные события, которые будут содержать данные для чтения потребителями.

буфер поддерживается массивом флагов (целочисленным массивом) его длины, который описывает доступность слотов буфера (см. Далее подробности). Доступ к массиву осуществляется как и java#AtomicIntegerArray, поэтому для целей этого объяснения вы можете также предположить, что это один.

там может быть любое количество производителей. Когда производитель хочет записать в буфер, генерируется длинное число (как и при вызове AtomicLong#getAndIncrement, Disruptor фактически использует свою собственную реализацию, но работает таким же образом). Назовем это сгенерированным длинным producerCallId. Аналогичным образом, consumerCallId генерируется, когда потребитель заканчивает чтение a слот из буфера. Самый последний consumerCallId доступен.

(Если потребителей много, то выбирается вызов с наименьшим идентификатором.)

затем эти идентификаторы сравниваются, и если разница между ними меньше, чем на стороне буфера, производителю разрешается писать.

(Если producerCallId больше, чем недавний consumerCallId + bufferSize, это означает, что буфер заполнен, и производитель вынужден ждать шины, пока пятно не станет доступный.)

затем производителю назначается слот в буфере на основе его callId (который является prducerCallId по модулю bufferSize, но поскольку bufferSize всегда имеет степень 2 (ограничение, применяемое при создании буфера), фактически используется операция producerCallId & (bufferSize - 1)). Затем можно изменить событие в этом слоте.

(фактический алгоритм немного сложнее, включая кэширование недавнего consumerId в отдельной атомной ссылке, для оптимизации цели.)

когда событие было изменено, изменение "опубликовано". При публикации соответствующий слот в массиве флагов заполняется обновленным флагом. Значение флага-это номер цикла (producerCallId, разделенный на bufferSize (опять же, поскольку bufferSize-это мощность 2, фактическая операция-сдвиг вправо).

подобным образом может быть любое количество потребителей. Каждый раз, когда потребитель хочет получить доступ к буферу, создается consumerCallId (в зависимости от как потребители были добавлены разрушитель атомов, используемые в производстве код может быть общим или отдельным для каждого из них). Этот consumerCallId затем сравнивается с самым последним producentCallId, и если он меньше из двух, читателю разрешено прогрессировать.

(аналогично, если producerCallId равен даже consumerCallId, это означает, что буфер пуст, и потребитель вынужден ждать. Способ ожидания определяется WaitStrategy во время disruptor создание.)

для отдельных потребителей (те, у которых есть собственный генератор идентификаторов), следующее, что проверяется, - это возможность пакетного потребления. Слоты в буфере рассматриваются в порядке от одного, соответствующего consumerCallId (индекс определяется таким же образом, как и для производителей), к одному, соответствующему недавнему producerCallId.

они рассматриваются в цикле путем сравнения значения флага, записанного в массиве флагов, со значением флага, сгенерированным для consumerCallId. Если флаги совпадают, это означает, что производители, заполняющие слоты, внесли свои изменения. Если нет, цикл прерывается, и возвращается самый высокий commited changeId. Слоты от ConsumerCallId до received в changeId могут использоваться в пакетном режиме.

Если группа потребителей читает вместе (те, с общим генератором идентификаторов), каждый из них принимает только один callId, и только слот для этого одного callId проверяется и возвращается.

С в этой статье:

шаблон disruptor представляет собой очередь дозирования, подкрепленную круговым массив (т. е. кольцевой буфер), заполненный предварительно выделенной передачей объекты, которые используют память-барьеры для синхронизации производителей и потребители через последовательности.

барьеры памяти довольно трудно объяснить, и блог Триши сделал лучшую попытку, на мой взгляд, с этим сообщением: http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast.html

но если вы не хотите погружаться в детали низкого уровня, вы можете просто знать, что барьеры памяти в Java реализованы через volatile ключевое слово или через java.util.concurrent.AtomicLong. Последовательности паттернов разрушителя AtomicLongs и передаются взад и вперед между производителями и потребителями через барьеры памяти вместо замков.

мне легче понять концепция через код, поэтому приведенный ниже код является простым введен С CoralQueue, что является реализацией шаблона disruptor, выполненной CoralBlocks, с которым я связан. В приведенном ниже коде вы можете увидеть, как шаблон disruptor реализует пакетирование и как кольцевой буфер (т. е. круговой массив) позволяет осуществлять связь без мусора между двумя потоками:

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;

public class Sample {

    public static void main(String[] args) throws InterruptedException {

        final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);

        Thread consumer = new Thread() {

            @Override
            public void run() {

                boolean running = true;

                while(running) {
                    long avail;
                    while((avail = queue.availableToPoll()) == 0); // busy spin
                    for(int i = 0; i < avail; i++) {
                        MutableLong ml = queue.poll();
                        if (ml.get() == -1) {
                            running = false;
                        } else {
                            System.out.println(ml.get());
                        }
                    }
                    queue.donePolling();
                }
            }

        };

        consumer.start();

        MutableLong ml;

        for(int i = 0; i < 10; i++) {
            while((ml = queue.nextToDispatch()) == null); // busy spin
            ml.set(System.nanoTime());
            queue.flush();
        }

        // send a message to stop consumer...
        while((ml = queue.nextToDispatch()) == null); // busy spin
        ml.set(-1);
        queue.flush();

        consumer.join(); // wait for the consumer thread to die...
    }
}