Проблемы с реализацией многочисленных несколькими производителями и потребителями в Java


Я написал простую задачу потребитель-производитель с блокирующей очередью, в которой несколько производителей и несколько потребителей берут и помещают целые числа в очередь. Тем не менее, когда я попытался протестировать его, результаты не так желательны, например, размер очереди не является правильным. Я не думаю, что потребитель и производитель синхронизируются вместе. Более того, я ставлю 2-секундный сон как на производителя, так и на потребителя, но при тестировании каждые две секунды он выводит результаты 2 производителей и 2 потребитель. Кто-нибудь знает, что я делаю не так? Может быть, я неправильно начинаю нити? Я прокомментировал другой способ, которым я это сделал, но результаты все равно были неправильными.

Результаты:

run:
Producing 425     Thread-0 size left 0
Consuming 890     Thread-3 size left 0
Consuming 425     Thread-2 size left 0
Producing 890     Thread-1 size left 0
Consuming 192     Thread-2 size left 0
Consuming 155     Thread-3 size left 0
Producing 155     Thread-1 size left 0
Producing 192     Thread-0 size left 0
Consuming 141     Thread-2 size left 1
Producing 141     Thread-0 size left 0
Producing 919     Thread-1 size left 0
Consuming 919     Thread-3 size left 0
Producing 361     Thread-1 size left 0
Producing 518     Thread-0 size left 0
Consuming 518     Thread-3 size left 0
Consuming 361     Thread-2 size left 0
Producing 350     Thread-0 size left 1
Consuming 350     Thread-3 size left 0
Consuming 767     Thread-2 size left 0
Producing 767     Thread-1 size left 0

Производитель

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Producer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    private int generateRandomNumber(int start, int end) {
        Random rand = new Random();
        int number = start + rand.nextInt(end - start + 1);
        return number;
    }

    public void run() {
        for (int i = 0; i < 5; i++) {
            int rand = generateRandomNumber(100, 1000);
            try {
                items.put(rand);
                System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

Потребитель

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Consumer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    public void run() {
        while (true) {
            try {
                System.out.println("Consuming " + items.take() + "     " + Thread.currentThread().getName() + " size left " + items.size());
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

Тест

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumer {
    public static void main(String args[]) {
        BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

        Producer producer = new Producer(items);
        Consumer consumer = new Consumer(items);
        Thread t1 = new Thread(producer);
        Thread t2 = new Thread(producer);
        Thread t3 = new Thread(consumer);
        Thread t4 = new Thread(consumer);
        /*
        Thread t1 = new Thread(new Producer());
        Thread t2 = new Thread(new Producer());
        Thread t3 = new Thread(new Consumer());
        Thread t4 = new Thread(new Consumer());
        */
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

UPDATE: я пытался реализовать реентерабельную блокировку, но моя программа останавливается на линии блокировки. Помощь есть? Потребительские

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Consumer implements Runnable { 

    //private BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); 
    private MyBlockingQ items;

    public Consumer(MyBlockingQ q) { 
        this.items = q; 
    } 

    public void run() { 
        while (true) { 
            items.remove();
            //Thread.sleep(1000);
        }
    }
} 

Производитель

import java.util.Random;

public class Producer implements Runnable {
    private MyBlockingQ items;
    public Producer(MyBlockingQ q) {
        this.items = q;
    }

    private int generateRandomNumber(int start, int end) {
        Random rand = new Random();
        int number = start + rand.nextInt(end - start + 1);
        return number;
    }

    public void run() {
        for (int i = 0; i < 5; i++) {
            int rand = generateRandomNumber(100, 1000);
            items.add(rand);
        }
    }
}

MyBlockingQ (общий доступ resouce)

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MyBlockingQ {

    private BlockingQueue<Integer> items = new LinkedBlockingQueue<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    public MyBlockingQ() {
    }

    public void add(Integer i) {
        lock.writeLock().lock();
        try {
            items.put(i);
            System.out.println("Producing " + i + "     " + Thread.currentThread().getName() + " size left " + items.size());
        } catch (InterruptedException ex) {
            Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            lock.writeLock().unlock();
        }
    }

    public void remove() {
        lock.writeLock().lock();
        try {
            int taken = items.take();
            System.out.println("Consuming " + taken + "     " + Thread.currentThread().getName() + " size left " + items.size());
        } catch (InterruptedException ex) {
            Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            lock.writeLock().unlock();
        }
    }
}

Тест

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 

public class ProducerConsumer { 
    public static void main(String args[]) { 
        MyBlockingQ items = new MyBlockingQ(); 

        System.out.println("starting");
        Thread t1 = new Thread(new Producer(items)); 
        Thread t2 = new Thread(new Producer(items)); 
        Thread t3 = new Thread(new Consumer(items)); 
        Thread t4 = new Thread(new Consumer(items)); 
        t1.start(); 
        t2.start(); 
        t3.start(); 
        t4.start(); 
    } 
} 
3 3

3 ответа:

Вас, вероятно, смущает эта часть вывода:

Producing 425     Thread-0 size left 0
Consuming 890     Thread-3 size left 0
Consuming 425     Thread-2 size left 0
Producing 890     Thread-1 size left 0

Вопрос: почему поток-3 потребляет 890 элементов, прежде чем поток-1 производит их?

ответ: поток-3 не потребляет предметы, прежде чем они были произведены, потоком-1.

Почему: Когда поток-1 помещает элементы в очередь, поток-3, вероятно, уже ожидает, что элементы будут взяты из очереди. Итак, поток-1 ставит пункты:

items.put(rand);

И перед тем, как поток-1 прыгает в следующая строка и выводит информацию о произведенных им элементах поток-3 выполняет следующую строку:

System.out.println("Consuming " + items.take() + "     " + Thread.currentThread().getName() + " size left " + items.size());

Только тогда поток-1 выполняет свой println:

System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());
Из-за этого вы можете видеть эти забавные результаты в консоли.

Возможно, вы захотите прочитать о синхронизации. Есть 2 способа решить вашу проблему:

  • синхронизированные методы
  • синхронизированные операторы (подход, используемый brimborium)

Синхронизация блокирует доступ к объектам, которые находятся внутри синхронизированного блока. Это означает, что любой другой метод должен ждать своей очереди, прежде чем он сможет получить доступ к объекту(объектам).

Итак, если вы используете синхронизацию для элементов как в производителе, так и в потребителе, то:

    Потребитель не может брать предметы, когда их кладет производитель. Производитель не может ставить предметы, когда их принимает потребитель.

I случай, когда элементы пусты и метод потребителя блокирует элементы, программа будет впадают в так называемый тупик. Производитель должен ждать, пока потребитель разблокирует , но этого никогда не произойдет, поскольку потребитель ждет, чтобы взять предметы (которые должны быть помещены туда производителем).

Более того, я положил 2-секундный сон как на производителя, так и на потребителя, но когда тестирование, каждые две секунды он распечатывает результаты 2 производителей и 2 потребителя.

Это именно то, чего вы должны ожидать. В тестовом классе вы делаете 2 производителя и 2 потребителя.
Thread t1 = new Thread(producer);
Thread t2 = new Thread(producer);
Thread t3 = new Thread(consumer);
Thread t4 = new Thread(consumer);

t1.start();
t2.start();
t3.start();
t4.start();

Эти две строки

items.put(rand);
System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());

Не синхронизированы. Производитель может поместить числа в очередь, но когда размер очереди отображается из потока, который поместил его, потребитель может уже использовать число.

Вам нужно синхронизировать доступ items. Я просто немного изменил ваш пример, и результат выглядит хорошо. Из-за синхронизации вам также придется позаботиться о мертвых замках. В этом случае все должно быть в порядке, пока вы не синхронизируетесь над items.take() в Consumer.

Ваш новый тест:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumer {
    public static void main(String args[]) {
        BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

        Thread t1 = new Thread(new Producer(items));
        Thread t2 = new Thread(new Producer(items));
        Thread t3 = new Thread(new Consumer(items));
        Thread t4 = new Thread(new Consumer(items));
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

Потребитель

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Consumer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    public void run() {
        while (true) {
            try {
                System.out.println("Consuming " + items.take() + "     " + Thread.currentThread().getName() + " size left " + items.size());
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

И продюсер

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Producer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    private int generateRandomNumber(int start, int end) {
        Random rand = new Random();
        int number = start + rand.nextInt(end - start + 1);
        return number;
    }

    public void run() {
        for (int i = 0; i < 5; i++) {
            int rand = generateRandomNumber(100, 1000);
            try {
                synchronized (items) {
                    items.put(rand);
                    System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());
                }
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}