LMAX Disruptor-что определяет размер пакета?


Я недавно узнал о Разрушителе LMAX и провел некоторые эксперименты. Одна вещь, которая меня озадачивает, - это параметр endOfBatch метода обработчика onEvent метода EventHandler. Рассмотрим мой следующий код. Во-первых, фиктивные классы сообщений и потребителей, которые я называю Test1 и Test1Worker:

public class Test1 {

}

public class Test1Worker implements EventHandler<Test1>{
    public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
        try{
            Thread.sleep(500);
        }
        catch(Exception e){
            e.printStackTrace();
        }
        System.out.println("Received message with sequence " + sequence + ". "
                + "EndOfBatch = " + endOfBatch);
    }
}
Обратите внимание, что я поставил задержку в 500 миллисекунд только в качестве замены для некоторых реальных работ. Я также печатаю в консоли порядковый номер

И тогда мой класс драйверов (который выступает в качестве производителя) называется DisruptorTest:

public class DisruptorTest {

    private static Disruptor<Test1> bus1;

    private static ExecutorService test1Workers;

    public static void main(String[] args){             
        test1Workers = Executors.newFixedThreadPool(1);

        bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);           
        bus1.handleEventsWith(new Test1Worker());
        RingBuffer<Test1> buf1 = bus1.start();

        for (int i = 0; i < 10; i++){
            long a = System.currentTimeMillis();
            long next = buf1.next();
            long b = System.currentTimeMillis();
            System.out.println("Delay for claiming slot " + i + " is "+ (b - a));
            try {
                Test1 message = buf1.get(next);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                buf1.publish(next);
            }
        }
    }

    public static class Test1Factory implements EventFactory<Test1> {
        public Test1 newInstance() {
            return new Test1();
        }

    }   
}

Здесь, после инициализации необходимых материалов, я передаю 10 сообщений в RingBuffer (размер буфера 8) и пытаюсь контролировать пару вещей - задержку для производителя для утверждения следующего слота в RingBuffer и сообщения с их порядковыми номерами на стороне потребителя, наряду с тем, рассматривается ли определенная последовательность как конец партии.

Теперь, интересно, С 500 мс задержка, связанная с обработкой каждого сообщения, это то, что я получаю в качестве вывода:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Delay for claiming slot 2 is 0
Delay for claiming slot 3 is 0
Delay for claiming slot 4 is 0
Delay for claiming slot 5 is 0
Delay for claiming slot 6 is 0
Delay for claiming slot 7 is 0
Received message with sequence 0. EndOfBatch = true
Delay for claiming slot 8 is 505
Received message with sequence 1. EndOfBatch = false
Received message with sequence 2. EndOfBatch = false
Received message with sequence 3. EndOfBatch = false
Received message with sequence 4. EndOfBatch = false
Received message with sequence 5. EndOfBatch = false
Received message with sequence 6. EndOfBatch = false
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 9 is 3519
Received message with sequence 8. EndOfBatch = true
Received message with sequence 9. EndOfBatch = true

Однако, если я удалю время ожидания 500 мс, это то, что я получаю:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Received message with sequence 0. EndOfBatch = true
Received message with sequence 1. EndOfBatch = true
Delay for claiming slot 2 is 0
Received message with sequence 2. EndOfBatch = true
Delay for claiming slot 3 is 0
Received message with sequence 3. EndOfBatch = true
Delay for claiming slot 4 is 0
Received message with sequence 4. EndOfBatch = true
Delay for claiming slot 5 is 0
Received message with sequence 5. EndOfBatch = true
Delay for claiming slot 6 is 0
Received message with sequence 6. EndOfBatch = true
Delay for claiming slot 7 is 0
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 8 is 1
Delay for claiming slot 9 is 0
Received message with sequence 8. EndOfBatch = false
Received message with sequence 9. EndOfBatch = true  
Таким образом, похоже, что на то, считается ли определенное сообщение в конце пакета (то есть на размер пакета), влияет задержка обработки сообщения потребителем. Может быть, я веду себя глупо, но разве так должно быть? И что за этим кроется? Что определяет размер пакета в целом так или иначе? Заранее спасибо. Дайте мне знать, если что-то в моем вопросе неясно.
1 4

1 ответ:

Размер пакета определяется исключительно количеством доступных элементов. Таким образом, если в данный момент доступно больше элементов, то они будут включены в пакет. Например, если Disruptor вызывает ваш код и в очереди находится только один элемент, то вы получите один вызов с endOfBatch=true. Если в очереди есть 8 элементов, то он соберет все 8 и отправит их в одном пакете.

Вы можете видеть в приведенном ниже коде, что # записей "доступных" в очереди выбираются и, что может быть намного больше, чем" следующий " пункт. Так, например, вы в данный момент 5, ожидая слот 6, а затем прибывают 3 события, доступно будет 8, и вы получите несколько вызовов (для 6,7,8) в пакетном режиме.

Https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/BatchEventProcessor.java#L124

final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
    event = dataProvider.get(nextSequence);
    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
    nextSequence++;
}

Что касается 500 мс паузы в элементе 9, обратите внимание, что Разрушитель построен с кольцевым буфером, и вы указали количество слотов в буфере равно 8 (см. Второй параметр здесь):

bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);  

Если не все потребители потребили элемент, а ringbuffer имеет емкость (все 8 элементов заполнены), производитель будет заблокирован от размещения новых событий в буфер. Вы можете попробовать увеличить размер буфера, скажем, на 2 миллиона объектов, или убедиться, что ваш потребитель быстрее, чем производитель, поэтому очередь не заполняется (удалите спящий режим, который вы уже продемонстрировали).