Очередь с высоким приоритетом по сравнению с более низкой в ruby AMQP с RabbitMQ?


Учитывая, что у меня есть рабочий, подписанный на две очереди "низкий" и "высокий", я хотел бы, чтобы рабочий работал только с сообщениями из очереди с низким приоритетом, Если очередь с высоким приоритетом пуста.

Я пытаюсь сделать это, определив два канала и установив prefetch на более высокое значение в очереди с более высоким приоритетом, как предложено здесь: http://dougbarth.github.io/2011/07/01/approximating-priority-with-rabbitmq.html

Это мой рабочий код:

require "rubygems"
require "amqp"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel_low  = AMQP::Channel.new(connection)
  channel_high  = AMQP::Channel.new(connection)

  # Attempting to set the prefetch higher on the high priority queue
  channel_low.prefetch(10)
  channel_high.prefetch(20)

  low_queue    = channel_low.queue("low", :auto_delete => false)
  high_queue    = channel_high.queue("high", :auto_delete => false)

  low_queue.subscribe do |payload|
    puts "#{payload}"
    slow_task
  end

  high_queue.subscribe do |payload|
    puts "#{payload}"
    slow_task
  end

  def slow_task
    # Do some slow work
    sleep(1)
  end
end

Когда Я запустите этот клиент против него я не вижу сообщения с высоким приоритетом, обработанные первыми:

require "rubygems"
require "amqp"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel  = AMQP::Channel.new(connection)

  low_queue    = channel.queue("low")
  high_queue    = channel.queue("high")
  exchange = channel.direct("")

  10.times do |i| 
    message = "LOW #{i}"
    puts "sending: #{message}"
    exchange.publish message, :routing_key => low_queue.name
  end

  # EventMachine.add_periodic_timer(0.0001) do
  10.times do |i|
    message = "HIGH #{i}"
    puts "sending: #{message}"
    exchange.publish message, :routing_key => high_queue.name
  end

end

Вывод:

Client >>>
sending: LOW 0
sending: LOW 1
sending: LOW 2
sending: LOW 3
sending: LOW 4
sending: LOW 5
sending: LOW 6
sending: LOW 7
sending: LOW 8
sending: LOW 9
sending: HIGH 0
sending: HIGH 1
sending: HIGH 2
sending: HIGH 3
sending: HIGH 4
sending: HIGH 5
sending: HIGH 6
sending: HIGH 7
sending: HIGH 8
sending: HIGH 9

Server >>>

HIGH 0
HIGH 1
LOW 0
LOW 1
LOW 2
HIGH 2
LOW 3
LOW 4
LOW 5
LOW 6
LOW 7
HIGH 3
LOW 8
LOW 9
HIGH 4
HIGH 5
HIGH 6
HIGH 7
HIGH 8
HIGH 9
4 2

4 ответа:

Как сказал Майкл, есть несколько проблем с вашим нынешним подходом:

  • Не включение явных acks означает, что RabbitMQ считает сообщения доставленными, когда он их отправил, а не когда вы их обработали
  • Ваши сообщения настолько малы, что их можно быстро доставить по сети
  • ваш блок подписки вызывается, когда EventMachine считывает сетевые данные, один раз для каждого полного кадра данных, которые он считывает из сокета
  • Наконец, блокировка реактора поток (со спящим режимом) будет препятствовать отправке em acks в сокет, поэтому правильное поведение не достигается.

Для реализации концепции приоритета необходимо отделить получение сетевых данных от их подтверждения. В нашем приложении (о котором я написал в блоге) мы использовали фоновый поток и приоритетную очередь для изменения порядка работы. Это вводит небольшой буфер сообщений в каждом работнике. Некоторые из этих сообщений могут быть сообщениями с низким приоритетом, которые не будет работать, пока нет сообщений с более высоким приоритетом для работы.

Вот немного измененный рабочий код, который использует рабочий поток и очередь приоритетов для получения желаемых результатов.

require "rubygems"
require "amqp"
require "pqueue"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel_low  = AMQP::Channel.new(connection)
  channel_high  = AMQP::Channel.new(connection)

  # Attempting to set the prefetch higher on the high priority queue
  channel_low.prefetch(10)
  channel_high.prefetch(20)

  low_queue    = channel_low.queue("low", :auto_delete => false)
  high_queue    = channel_high.queue("high", :auto_delete => false)

  # Our priority queue for buffering messages in the worker's memory
  to_process = PQueue.new {|a,b| a[0] > b[0] }

  # The pqueue gem isn't thread safe
  mutex = Mutex.new

  # Background thread for working blocking operation. We can spin up more of
  # these to increase concurrency.
  Thread.new do
    loop do
      _, header, payload = mutex.synchronize { to_process.pop }

      if payload
        puts "#{payload}"
        slow_task
        # We need to call ack on the EM thread.
        EM.next_tick { header.ack }
      else
        sleep(0.1)
      end
    end
  end

  low_queue.subscribe(:ack => true) do |header, payload|
    mutex.synchronize { to_process << [0, header, payload] }
  end

  high_queue.subscribe(:ack => true) do |header, payload|
    mutex.synchronize { to_process << [10, header, payload] }
  end

  def slow_task
    # Do some slow work
    sleep(1)
  end
end

Если требуется увеличить параллелизм, можно создать несколько фоновых потоков.

Ваш подход является одним из наиболее часто используемых обходных путей. Тем не менее, есть пара проблемы в конкретном примере размещены.

Prefetch не контролирует, какой канал имеет приоритет для поставок. Он контролирует, сколько сообщений может быть "в процессе" (непризнанным) на нем.

Это может быть использовано в качестве метода приоритизации бедных людей, однако вы используете режим автоматического подтверждения, поэтому prefetch не вступает в игру (RabbitMQ немедленно рассматривает ваши сообщения признанный как только он их рассылает).

Если вы публикуете только небольшое количество сообщений, а затем ваш пример завершает работу, гораздо более вероятно, что порядок будет сильно зависеть от того, в каком порядке вы публикуете сообщения.

Чтобы увидеть эффект настройки предварительной выборки с ручными подтверждениями, вам нужно запустить ее в течение более длительного периода времени (который зависит от скорости передачи сообщений, но, скажем, не менее минуты.

Можете ли вы попробовать что-то подобное? Я не уверен, что это производит то, что вам нужно, но стоит попробовать.

10.times do 
   10.times do 
      exchange.publish "HIGH", :routing_key => high_queue.name
    end  
   exchange.publish "LOW", :routing_key => low_queue.name
end

Вы публикуете небольшие сообщения процессору, который, вероятно, обрабатывает их в течение нескольких миллисекунд после их поступления. Я подозреваю, что то, что вы видите здесь, - это образец шума, а не то, что вы на самом деле думаете, что видите.

Можете ли вы проверить, что порядок, в котором публикуются сообщения, соответствует вашим ожиданиям?