Блокировка очереди и многопоточного потребителя, Как узнать, когда остановиться
у меня есть один потоковый производитель, который создает некоторые объекты задачи, которые затем добавляются в ArrayBlockingQueue
(который имеет фиксированный размер).
Я также запускаю многопоточный потребитель. Это сборка в виде фиксированного пула потоков (Executors.newFixedThreadPool(threadCount);
). Затем я отправляю некоторые intances ConsumerWorker в этот threadPool, каждый ConsumerWorker имеет ссылку на вышеупомянутый экземпляр ArrayBlockingQueue.
каждый такой работник будет делать take()
на очереди и дело с задача.
моя проблема заключается в том, что лучший способ заставить работника знать, когда больше не будет никакой работы. Другими словами, как я могу сказать рабочим, что производитель закончил добавление в очередь, и с этого момента каждый рабочий должен остановиться, когда он видит, что очередь пуста.
теперь у меня есть настройка, в которой мой продюсер инициализируется обратным вызовом, который запускается, когда он заканчивает свою работу (добавление материала в очередь). Я также держу список все потребительские работники, которых я создал и отправил в ThreadPool. Когда обратный вызов продюсера говорит мне, что продюсер закончил, я могу сказать это каждому из работников. На этом этапе они должны просто продолжать проверять, не пуста ли очередь, и когда она становится пустой, они должны остановиться, что позволяет мне изящно завершить работу пула потоков ExecutorService. Это что-то вроде этого
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
проблема здесь в том, что у меня есть очевидное состояние гонки, где иногда производитель заканчивает, сигнализирует об этом, и потребительские работники останавливаются, прежде чем потреблять все в очереди.
мой вопрос в том, как лучше всего синхронизировать это, чтобы все работало нормально? Должен ли я синхронизировать всю часть, где он проверяет, работает ли производитель плюс, если очередь пуста, плюс взять что-то из очереди в одном блоке (на объекте очереди)? Я должен просто синхронизировать обновление isRunning
boolean на экземпляре ConsumerWorker? Любой другое предложение?
ОБНОВЛЕНИЕ, ВОТ РАБОЧАЯ РЕАЛИЗАЦИЯ, КОТОРУЮ Я В КОНЕЧНОМ ИТОГЕ ИСПОЛЬЗОВАЛ:
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
это было сильно вдохновлено ответом Джонвинта ниже, с некоторыми незначительными изменениями.
=== обновление из-за комментария @vendhan.
Спасибо за ваше obeservation. Вы правы, первый фрагмент кода в этом вопросе имеет (среди других вопросов) тот, где while(isRunning || !inputQueue.isEmpty())
на самом деле не делает чувство.
в моей фактической окончательной реализации этого я делаю что-то, что ближе к вашему предложению заменить "||" (или) на "&&" (и), в том смысле, что каждый работник (потребитель) теперь только проверяет, является ли элемент, который он получил из списка, ядовитой таблеткой, и если да, то останавливается (поэтому теоретически мы можем сказать, что работник должен работать, и очередь не должна быть пустой).
6 ответов:
вы должны продолжать
take()
из очереди. Вы можете использовать ядовитую таблетку, чтобы сказать работнику остановиться. Например:private final Object POISON_PILL = new Object(); @Override public void run() { //worker loop keeps taking en element from the queue as long as the producer is still running or as //long as the queue is not empty: while(isRunning) { System.out.println("Consumer "+Thread.currentThread().getName()+" START"); try { Object queueElement = inputQueue.take(); if(queueElement == POISON_PILL) { inputQueue.add(POISON_PILL);//notify other threads to stop return; } //process queueElement } catch (Exception e) { e.printStackTrace(); } } } //this is used to signal from the main thread that he producer has finished adding stuff to the queue public void finish() { //you can also clear here if you wanted isRunning = false; inputQueue.add(POISON_PILL); }
Я бы послал рабочим специальный рабочий пакет, чтобы сигнализировать, что они должны закрыться:
public class ConsumerWorker implements Runnable{ private static final Produced DONE = new Produced(); private BlockingQueue<Produced> inputQueue; public ConsumerWorker(BlockingQueue<Produced> inputQueue) { this.inputQueue = inputQueue; } @Override public void run() { for (;;) { try { Produced item = inputQueue.take(); if (item == DONE) { inputQueue.add(item); // keep in the queue so all workers stop break; } // process `item` } catch (Exception e) { e.printStackTrace(); } } }
}
остановить рабочих, просто добавьте
ConsumerWorker.DONE
в очередь.
в вашем блоке кода , где вы пытаетесь повторно получить элемент из очереди, используйте
poll(time,unit)
вместоtake()
.try { Object queueElement = inputQueue.poll(timeout,unit); //process queueElement } catch (InterruptedException e) { if(!isRunning && queue.isEmpty()) return ; }
указав соответствующие значения таймаута, вы гарантируете, что потоки не будут продолжать блокировать в случае неудачной последовательности
isRunning
истинно- очередь становится пустой, поэтому потоки входят в заблокированное ожидание ( если используется
take()
isRunning
имеет значение false
есть несколько стратегий, которые вы могли бы использовать, но один простой-иметь подкласс задачи, который сигнализирует о завершении задания. Производитель не посылает этот сигнал напрямую. Вместо этого он ставит в очередь экземпляр этого подкласса задачи. Когда один из ваших потребителей снимает эту задачу и выполняет ее, это вызывает отправку сигнала.
мне пришлось использовать многопоточного производителя и многопоточного потребителя. Я закончил с
Scheduler -- N Producers -- M Consumers
схема, каждые два связываются через очередь (всего две очереди). Планировщик заполняет первую очередь запросами на получение данных, а затем заполняет ее N "ядовитыми таблетками". Существует счетчик активных производителей (atomic int), и последний производитель, который получает последнюю ядовитую таблетку, отправляет M ядовитых таблеток в очередь потребителей.