Очередь с высоким приоритетом по сравнению с более низкой в ruby AMQP с RabbitMQ?
Учитывая, что у меня есть рабочий, подписанный на две очереди "низкий" и "высокий", я хотел бы, чтобы рабочий работал только с сообщениями из очереди с низким приоритетом, Если очередь с высоким приоритетом пуста.
Я пытаюсь сделать это, определив два канала и установив prefetch на более высокое значение в очереди с более высоким приоритетом, как предложено здесь: http://dougbarth.github.io/2011/07/01/approximating-priority-with-rabbitmq.html
Это мой рабочий код:
require "rubygems"
require "amqp"
EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel_low  = AMQP::Channel.new(connection)
  channel_high  = AMQP::Channel.new(connection)
  # Attempting to set the prefetch higher on the high priority queue
  channel_low.prefetch(10)
  channel_high.prefetch(20)
  low_queue    = channel_low.queue("low", :auto_delete => false)
  high_queue    = channel_high.queue("high", :auto_delete => false)
  low_queue.subscribe do |payload|
    puts "#{payload}"
    slow_task
  end
  high_queue.subscribe do |payload|
    puts "#{payload}"
    slow_task
  end
  def slow_task
    # Do some slow work
    sleep(1)
  end
end
Когда Я запустите этот клиент против него я не вижу сообщения с высоким приоритетом, обработанные первыми:
require "rubygems"
require "amqp"
EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel  = AMQP::Channel.new(connection)
  low_queue    = channel.queue("low")
  high_queue    = channel.queue("high")
  exchange = channel.direct("")
  10.times do |i| 
    message = "LOW #{i}"
    puts "sending: #{message}"
    exchange.publish message, :routing_key => low_queue.name
  end
  # EventMachine.add_periodic_timer(0.0001) do
  10.times do |i|
    message = "HIGH #{i}"
    puts "sending: #{message}"
    exchange.publish message, :routing_key => high_queue.name
  end
end
Вывод:
Client >>>
sending: LOW 0
sending: LOW 1
sending: LOW 2
sending: LOW 3
sending: LOW 4
sending: LOW 5
sending: LOW 6
sending: LOW 7
sending: LOW 8
sending: LOW 9
sending: HIGH 0
sending: HIGH 1
sending: HIGH 2
sending: HIGH 3
sending: HIGH 4
sending: HIGH 5
sending: HIGH 6
sending: HIGH 7
sending: HIGH 8
sending: HIGH 9
Server >>>
HIGH 0
HIGH 1
LOW 0
LOW 1
LOW 2
HIGH 2
LOW 3
LOW 4
LOW 5
LOW 6
LOW 7
HIGH 3
LOW 8
LOW 9
HIGH 4
HIGH 5
HIGH 6
HIGH 7
HIGH 8
HIGH 9
4 ответа:
Как сказал Майкл, есть несколько проблем с вашим нынешним подходом:
- Не включение явных acks означает, что RabbitMQ считает сообщения доставленными, когда он их отправил, а не когда вы их обработали
- Ваши сообщения настолько малы, что их можно быстро доставить по сети
- ваш блок подписки вызывается, когда EventMachine считывает сетевые данные, один раз для каждого полного кадра данных, которые он считывает из сокета
- Наконец, блокировка реактора поток (со спящим режимом) будет препятствовать отправке em acks в сокет, поэтому правильное поведение не достигается.
Для реализации концепции приоритета необходимо отделить получение сетевых данных от их подтверждения. В нашем приложении (о котором я написал в блоге) мы использовали фоновый поток и приоритетную очередь для изменения порядка работы. Это вводит небольшой буфер сообщений в каждом работнике. Некоторые из этих сообщений могут быть сообщениями с низким приоритетом, которые не будет работать, пока нет сообщений с более высоким приоритетом для работы.
Вот немного измененный рабочий код, который использует рабочий поток и очередь приоритетов для получения желаемых результатов.
require "rubygems" require "amqp" require "pqueue" EventMachine.run do connection = AMQP.connect(:host => '127.0.0.1') channel_low = AMQP::Channel.new(connection) channel_high = AMQP::Channel.new(connection) # Attempting to set the prefetch higher on the high priority queue channel_low.prefetch(10) channel_high.prefetch(20) low_queue = channel_low.queue("low", :auto_delete => false) high_queue = channel_high.queue("high", :auto_delete => false) # Our priority queue for buffering messages in the worker's memory to_process = PQueue.new {|a,b| a[0] > b[0] } # The pqueue gem isn't thread safe mutex = Mutex.new # Background thread for working blocking operation. We can spin up more of # these to increase concurrency. Thread.new do loop do _, header, payload = mutex.synchronize { to_process.pop } if payload puts "#{payload}" slow_task # We need to call ack on the EM thread. EM.next_tick { header.ack } else sleep(0.1) end end end low_queue.subscribe(:ack => true) do |header, payload| mutex.synchronize { to_process << [0, header, payload] } end high_queue.subscribe(:ack => true) do |header, payload| mutex.synchronize { to_process << [10, header, payload] } end def slow_task # Do some slow work sleep(1) end endЕсли требуется увеличить параллелизм, можно создать несколько фоновых потоков.
Ваш подход является одним из наиболее часто используемых обходных путей. Тем не менее, есть пара проблемы в конкретном примере размещены.
Prefetch не контролирует, какой канал имеет приоритет для поставок. Он контролирует, сколько сообщений может быть "в процессе" (непризнанным) на нем.
Это может быть использовано в качестве метода приоритизации бедных людей, однако вы используете режим автоматического подтверждения, поэтому prefetch не вступает в игру (RabbitMQ немедленно рассматривает ваши сообщения признанный как только он их рассылает).
Если вы публикуете только небольшое количество сообщений, а затем ваш пример завершает работу, гораздо более вероятно, что порядок будет сильно зависеть от того, в каком порядке вы публикуете сообщения.
Чтобы увидеть эффект настройки предварительной выборки с ручными подтверждениями, вам нужно запустить ее в течение более длительного периода времени (который зависит от скорости передачи сообщений, но, скажем, не менее минуты.
Можете ли вы попробовать что-то подобное? Я не уверен, что это производит то, что вам нужно, но стоит попробовать.
10.times do 10.times do exchange.publish "HIGH", :routing_key => high_queue.name end exchange.publish "LOW", :routing_key => low_queue.name end
Вы публикуете небольшие сообщения процессору, который, вероятно, обрабатывает их в течение нескольких миллисекунд после их поступления. Я подозреваю, что то, что вы видите здесь, - это образец шума, а не то, что вы на самом деле думаете, что видите.
Можете ли вы проверить, что порядок, в котором публикуются сообщения, соответствует вашим ожиданиям?