Проблемы с реализацией многочисленных несколькими производителями и потребителями в Java
Я написал простую задачу потребитель-производитель с блокирующей очередью, в которой несколько производителей и несколько потребителей берут и помещают целые числа в очередь. Тем не менее, когда я попытался протестировать его, результаты не так желательны, например, размер очереди не является правильным. Я не думаю, что потребитель и производитель синхронизируются вместе. Более того, я ставлю 2-секундный сон как на производителя, так и на потребителя, но при тестировании каждые две секунды он выводит результаты 2 производителей и 2 потребитель. Кто-нибудь знает, что я делаю не так? Может быть, я неправильно начинаю нити? Я прокомментировал другой способ, которым я это сделал, но результаты все равно были неправильными.
Результаты:
run:
Producing 425 Thread-0 size left 0
Consuming 890 Thread-3 size left 0
Consuming 425 Thread-2 size left 0
Producing 890 Thread-1 size left 0
Consuming 192 Thread-2 size left 0
Consuming 155 Thread-3 size left 0
Producing 155 Thread-1 size left 0
Producing 192 Thread-0 size left 0
Consuming 141 Thread-2 size left 1
Producing 141 Thread-0 size left 0
Producing 919 Thread-1 size left 0
Consuming 919 Thread-3 size left 0
Producing 361 Thread-1 size left 0
Producing 518 Thread-0 size left 0
Consuming 518 Thread-3 size left 0
Consuming 361 Thread-2 size left 0
Producing 350 Thread-0 size left 1
Consuming 350 Thread-3 size left 0
Consuming 767 Thread-2 size left 0
Producing 767 Thread-1 size left 0
Производитель
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Producer implements Runnable {
BlockingQueue<Integer> items = new LinkedBlockingQueue<>();
public Producer(BlockingQueue<Integer> q) {
this.items = q;
}
private int generateRandomNumber(int start, int end) {
Random rand = new Random();
int number = start + rand.nextInt(end - start + 1);
return number;
}
public void run() {
for (int i = 0; i < 5; i++) {
int rand = generateRandomNumber(100, 1000);
try {
items.put(rand);
System.out.println("Producing " + rand + " " + Thread.currentThread().getName() + " size left " + items.size());
Thread.sleep(3000);
} catch (InterruptedException ex) {
Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
Потребитель
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Consumer implements Runnable {
BlockingQueue<Integer> items = new LinkedBlockingQueue<>();
public Consumer(BlockingQueue<Integer> q) {
this.items = q;
}
public void run() {
while (true) {
try {
System.out.println("Consuming " + items.take() + " " + Thread.currentThread().getName() + " size left " + items.size());
Thread.sleep(3000);
} catch (InterruptedException ex) {
Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
Тест
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumer {
public static void main(String args[]) {
BlockingQueue<Integer> items = new LinkedBlockingQueue<>();
Producer producer = new Producer(items);
Consumer consumer = new Consumer(items);
Thread t1 = new Thread(producer);
Thread t2 = new Thread(producer);
Thread t3 = new Thread(consumer);
Thread t4 = new Thread(consumer);
/*
Thread t1 = new Thread(new Producer());
Thread t2 = new Thread(new Producer());
Thread t3 = new Thread(new Consumer());
Thread t4 = new Thread(new Consumer());
*/
t1.start();
t2.start();
t3.start();
t4.start();
}
}
UPDATE: я пытался реализовать реентерабельную блокировку, но моя программа останавливается на линии блокировки. Помощь есть? Потребительские
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Consumer implements Runnable {
//private BlockingQueue<Integer> items = new LinkedBlockingQueue<>();
private MyBlockingQ items;
public Consumer(MyBlockingQ q) {
this.items = q;
}
public void run() {
while (true) {
items.remove();
//Thread.sleep(1000);
}
}
}
Производитель
import java.util.Random;
public class Producer implements Runnable {
private MyBlockingQ items;
public Producer(MyBlockingQ q) {
this.items = q;
}
private int generateRandomNumber(int start, int end) {
Random rand = new Random();
int number = start + rand.nextInt(end - start + 1);
return number;
}
public void run() {
for (int i = 0; i < 5; i++) {
int rand = generateRandomNumber(100, 1000);
items.add(rand);
}
}
}
MyBlockingQ (общий доступ resouce)
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
public class MyBlockingQ {
private BlockingQueue<Integer> items = new LinkedBlockingQueue<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public MyBlockingQ() {
}
public void add(Integer i) {
lock.writeLock().lock();
try {
items.put(i);
System.out.println("Producing " + i + " " + Thread.currentThread().getName() + " size left " + items.size());
} catch (InterruptedException ex) {
Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
} finally {
lock.writeLock().unlock();
}
}
public void remove() {
lock.writeLock().lock();
try {
int taken = items.take();
System.out.println("Consuming " + taken + " " + Thread.currentThread().getName() + " size left " + items.size());
} catch (InterruptedException ex) {
Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
} finally {
lock.writeLock().unlock();
}
}
}
Тест
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumer {
public static void main(String args[]) {
MyBlockingQ items = new MyBlockingQ();
System.out.println("starting");
Thread t1 = new Thread(new Producer(items));
Thread t2 = new Thread(new Producer(items));
Thread t3 = new Thread(new Consumer(items));
Thread t4 = new Thread(new Consumer(items));
t1.start();
t2.start();
t3.start();
t4.start();
}
}
3 ответа:
Вас, вероятно, смущает эта часть вывода:
Producing 425 Thread-0 size left 0 Consuming 890 Thread-3 size left 0 Consuming 425 Thread-2 size left 0 Producing 890 Thread-1 size left 0
Вопрос: почему поток-3 потребляет 890 элементов, прежде чем поток-1 производит их?
ответ: поток-3 не потребляет предметы, прежде чем они были произведены, потоком-1.
Почему: Когда поток-1 помещает элементы в очередь, поток-3, вероятно, уже ожидает, что элементы будут взяты из очереди. Итак, поток-1 ставит пункты:
items.put(rand);
И перед тем, как поток-1 прыгает в следующая строка и выводит информацию о произведенных им элементах поток-3 выполняет следующую строку:
System.out.println("Consuming " + items.take() + " " + Thread.currentThread().getName() + " size left " + items.size());
Только тогда поток-1 выполняет свой println:
Из-за этого вы можете видеть эти забавные результаты в консоли.System.out.println("Producing " + rand + " " + Thread.currentThread().getName() + " size left " + items.size());
Возможно, вы захотите прочитать о синхронизации. Есть 2 способа решить вашу проблему:
- синхронизированные методы
- синхронизированные операторы (подход, используемый brimborium)
Синхронизация блокирует доступ к объектам, которые находятся внутри синхронизированного блока. Это означает, что любой другой метод должен ждать своей очереди, прежде чем он сможет получить доступ к объекту(объектам).
Итак, если вы используете синхронизацию для элементов как в производителе, так и в потребителе, то:
Потребитель не может брать предметы, когда их кладет производитель. Производитель не может ставить предметы, когда их принимает потребитель.
I случай, когда элементы пусты и метод потребителя блокирует элементы, программа будет впадают в так называемый тупик. Производитель должен ждать, пока потребитель разблокирует , но этого никогда не произойдет, поскольку потребитель ждет, чтобы взять предметы (которые должны быть помещены туда производителем).
Это именно то, чего вы должны ожидать. В тестовом классе вы делаете 2 производителя и 2 потребителя.Более того, я положил 2-секундный сон как на производителя, так и на потребителя, но когда тестирование, каждые две секунды он распечатывает результаты 2 производителей и 2 потребителя.
Thread t1 = new Thread(producer); Thread t2 = new Thread(producer); Thread t3 = new Thread(consumer); Thread t4 = new Thread(consumer); t1.start(); t2.start(); t3.start(); t4.start();
Эти две строки
items.put(rand); System.out.println("Producing " + rand + " " + Thread.currentThread().getName() + " size left " + items.size());
Не синхронизированы. Производитель может поместить числа в очередь, но когда размер очереди отображается из потока, который поместил его, потребитель может уже использовать число.
Вам нужно синхронизировать доступ
items
. Я просто немного изменил ваш пример, и результат выглядит хорошо. Из-за синхронизации вам также придется позаботиться о мертвых замках. В этом случае все должно быть в порядке, пока вы не синхронизируетесь надitems.take()
вConsumer
.Ваш новый тест:
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class ProducerConsumer { public static void main(String args[]) { BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); Thread t1 = new Thread(new Producer(items)); Thread t2 = new Thread(new Producer(items)); Thread t3 = new Thread(new Consumer(items)); Thread t4 = new Thread(new Consumer(items)); t1.start(); t2.start(); t3.start(); t4.start(); } }
Потребитель
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; public class Consumer implements Runnable { BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); public Consumer(BlockingQueue<Integer> q) { this.items = q; } public void run() { while (true) { try { System.out.println("Consuming " + items.take() + " " + Thread.currentThread().getName() + " size left " + items.size()); Thread.sleep(1000); } catch (InterruptedException ex) { Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex); } } } }
И продюсер
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; public class Producer implements Runnable { BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); public Producer(BlockingQueue<Integer> q) { this.items = q; } private int generateRandomNumber(int start, int end) { Random rand = new Random(); int number = start + rand.nextInt(end - start + 1); return number; } public void run() { for (int i = 0; i < 5; i++) { int rand = generateRandomNumber(100, 1000); try { synchronized (items) { items.put(rand); System.out.println("Producing " + rand + " " + Thread.currentThread().getName() + " size left " + items.size()); } Thread.sleep(1000); } catch (InterruptedException ex) { Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex); } } } }