Очередь с высоким приоритетом по сравнению с более низкой в ruby AMQP с RabbitMQ?
Учитывая, что у меня есть рабочий, подписанный на две очереди "низкий" и "высокий", я хотел бы, чтобы рабочий работал только с сообщениями из очереди с низким приоритетом, Если очередь с высоким приоритетом пуста.
Я пытаюсь сделать это, определив два канала и установив prefetch на более высокое значение в очереди с более высоким приоритетом, как предложено здесь: http://dougbarth.github.io/2011/07/01/approximating-priority-with-rabbitmq.html
Это мой рабочий код:
require "rubygems"
require "amqp"
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel_low = AMQP::Channel.new(connection)
channel_high = AMQP::Channel.new(connection)
# Attempting to set the prefetch higher on the high priority queue
channel_low.prefetch(10)
channel_high.prefetch(20)
low_queue = channel_low.queue("low", :auto_delete => false)
high_queue = channel_high.queue("high", :auto_delete => false)
low_queue.subscribe do |payload|
puts "#{payload}"
slow_task
end
high_queue.subscribe do |payload|
puts "#{payload}"
slow_task
end
def slow_task
# Do some slow work
sleep(1)
end
end
Когда Я запустите этот клиент против него я не вижу сообщения с высоким приоритетом, обработанные первыми:
require "rubygems"
require "amqp"
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(connection)
low_queue = channel.queue("low")
high_queue = channel.queue("high")
exchange = channel.direct("")
10.times do |i|
message = "LOW #{i}"
puts "sending: #{message}"
exchange.publish message, :routing_key => low_queue.name
end
# EventMachine.add_periodic_timer(0.0001) do
10.times do |i|
message = "HIGH #{i}"
puts "sending: #{message}"
exchange.publish message, :routing_key => high_queue.name
end
end
Вывод:
Client >>>
sending: LOW 0
sending: LOW 1
sending: LOW 2
sending: LOW 3
sending: LOW 4
sending: LOW 5
sending: LOW 6
sending: LOW 7
sending: LOW 8
sending: LOW 9
sending: HIGH 0
sending: HIGH 1
sending: HIGH 2
sending: HIGH 3
sending: HIGH 4
sending: HIGH 5
sending: HIGH 6
sending: HIGH 7
sending: HIGH 8
sending: HIGH 9
Server >>>
HIGH 0
HIGH 1
LOW 0
LOW 1
LOW 2
HIGH 2
LOW 3
LOW 4
LOW 5
LOW 6
LOW 7
HIGH 3
LOW 8
LOW 9
HIGH 4
HIGH 5
HIGH 6
HIGH 7
HIGH 8
HIGH 9
4 ответа:
Как сказал Майкл, есть несколько проблем с вашим нынешним подходом:
- Не включение явных acks означает, что RabbitMQ считает сообщения доставленными, когда он их отправил, а не когда вы их обработали
- Ваши сообщения настолько малы, что их можно быстро доставить по сети
- ваш блок подписки вызывается, когда EventMachine считывает сетевые данные, один раз для каждого полного кадра данных, которые он считывает из сокета
- Наконец, блокировка реактора поток (со спящим режимом) будет препятствовать отправке em acks в сокет, поэтому правильное поведение не достигается.
Для реализации концепции приоритета необходимо отделить получение сетевых данных от их подтверждения. В нашем приложении (о котором я написал в блоге) мы использовали фоновый поток и приоритетную очередь для изменения порядка работы. Это вводит небольшой буфер сообщений в каждом работнике. Некоторые из этих сообщений могут быть сообщениями с низким приоритетом, которые не будет работать, пока нет сообщений с более высоким приоритетом для работы.
Вот немного измененный рабочий код, который использует рабочий поток и очередь приоритетов для получения желаемых результатов.
require "rubygems" require "amqp" require "pqueue" EventMachine.run do connection = AMQP.connect(:host => '127.0.0.1') channel_low = AMQP::Channel.new(connection) channel_high = AMQP::Channel.new(connection) # Attempting to set the prefetch higher on the high priority queue channel_low.prefetch(10) channel_high.prefetch(20) low_queue = channel_low.queue("low", :auto_delete => false) high_queue = channel_high.queue("high", :auto_delete => false) # Our priority queue for buffering messages in the worker's memory to_process = PQueue.new {|a,b| a[0] > b[0] } # The pqueue gem isn't thread safe mutex = Mutex.new # Background thread for working blocking operation. We can spin up more of # these to increase concurrency. Thread.new do loop do _, header, payload = mutex.synchronize { to_process.pop } if payload puts "#{payload}" slow_task # We need to call ack on the EM thread. EM.next_tick { header.ack } else sleep(0.1) end end end low_queue.subscribe(:ack => true) do |header, payload| mutex.synchronize { to_process << [0, header, payload] } end high_queue.subscribe(:ack => true) do |header, payload| mutex.synchronize { to_process << [10, header, payload] } end def slow_task # Do some slow work sleep(1) end end
Если требуется увеличить параллелизм, можно создать несколько фоновых потоков.
Ваш подход является одним из наиболее часто используемых обходных путей. Тем не менее, есть пара проблемы в конкретном примере размещены.
Prefetch не контролирует, какой канал имеет приоритет для поставок. Он контролирует, сколько сообщений может быть "в процессе" (непризнанным) на нем.
Это может быть использовано в качестве метода приоритизации бедных людей, однако вы используете режим автоматического подтверждения, поэтому prefetch не вступает в игру (RabbitMQ немедленно рассматривает ваши сообщения признанный как только он их рассылает).
Если вы публикуете только небольшое количество сообщений, а затем ваш пример завершает работу, гораздо более вероятно, что порядок будет сильно зависеть от того, в каком порядке вы публикуете сообщения.
Чтобы увидеть эффект настройки предварительной выборки с ручными подтверждениями, вам нужно запустить ее в течение более длительного периода времени (который зависит от скорости передачи сообщений, но, скажем, не менее минуты.
Можете ли вы попробовать что-то подобное? Я не уверен, что это производит то, что вам нужно, но стоит попробовать.
10.times do 10.times do exchange.publish "HIGH", :routing_key => high_queue.name end exchange.publish "LOW", :routing_key => low_queue.name end
Вы публикуете небольшие сообщения процессору, который, вероятно, обрабатывает их в течение нескольких миллисекунд после их поступления. Я подозреваю, что то, что вы видите здесь, - это образец шума, а не то, что вы на самом деле думаете, что видите.
Можете ли вы проверить, что порядок, в котором публикуются сообщения, соответствует вашим ожиданиям?