生产者消费者问题

发布时间 2023-06-09 12:26:55作者: 不会上猪的树

生产者消费者问题

其实这个问题在一开始阶段只存在两个问题,但随着多线程的情况下,同步的执行顺序和临界资源的安全性也必须得以保障,之前在信号量(缓冲区槽位和计数器)和互斥锁中有单独地分开去解决生产者消费者问题,现在来去真正的解决一下这个问题:

import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerExample {
    private static final int BUFFER_SIZE = 10;
    private static int[] buffer = new int[BUFFER_SIZE];
    private static int count = 0;
    private static Lock lock = new ReentrantLock();
    private static Condition notFull = lock.newCondition();
    private static Condition notEmpty = lock.newCondition();

    private static Semaphore mutex = new Semaphore(1);
    private static Semaphore emptySlots = new Semaphore(BUFFER_SIZE);
    private static Semaphore filledSlots = new Semaphore(0);

    public static void main(String[] args) {
        Thread producerThread = new Thread(producer);
        Thread consumerThread = new Thread(consumer);

        producerThread.start();
        consumerThread.start();
    }

    private static Runnable producer = () -> {
        try {
            int producedValue = 1;
            while (true) {
                emptySlots.acquire(); // 等待缓冲区中的空槽

                lock.lock();
                try {
                    buffer[count] = producedValue;
                    count++;
                    System.out.println("Producer produced: " + producedValue);
                    producedValue++;

                    notEmpty.signal(); // 通知消费者有可用的值
                } finally {
                    lock.unlock();
                }

                filledSlots.release(); // 增加已填充的槽数
                Thread.sleep(1000); // 休眠一段时间,模拟生产者
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    private static Runnable consumer = () -> {
        try {
            while (true) {
                filledSlots.acquire(); // 等待缓冲区中的空槽

                lock.lock();
                try {
                    int consumedValue = buffer[count - 1];
                    count--;
                    System.out.println("Consumer consumed: " + consumedValue);

                    notFull.signal(); // 通知生产者有一个空槽可用
                } finally {
                    lock.unlock();
                }

                emptySlots.release(); // 增加空槽数
                Thread.sleep(2000); // 模拟消费者所做的一些工作
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };
}

老规矩,来说明一下每一个变量的作用及其真正含义:

  1. BUFFER_SIZE:这是一个常量,表示缓冲区的大小,即可以存储的产品数量。
  2. buffer:这是一个整型数组,用于存储生产者生产的产品。
  3. count:这是一个整数变量,表示当前缓冲区中已经存储的产品数量。
  4. lock:这是一个互斥锁(Lock接口的实例),用于实现对临界区的互斥访问。
  5. notFull:这是一个条件变量(Condition接口的实例),用于在缓冲区满时阻塞生产者线程。
  6. notEmpty:这是一个条件变量(Condition接口的实例),用于在缓冲区空时阻塞消费者线程。
  7. mutex:这是一个信号量(Semaphore类的实例),用于实现对临界区的互斥访问,相当于互斥锁的概念。/** 这里实际没有用到 ,在之前信号量的代码中通过这样实现的 **/
  8. emptySlots:这是一个信号量(Semaphore类的实例),表示缓冲区中可用的空槽数量。
  9. filledSlots:这是一个信号量(Semaphore类的实例),表示缓冲区中已经被生产者填充的槽数量。
  10. producerThreadconsumerThread:这是用于执行生产者和消费者任务的线程对象。
  11. producerconsumer:这是两个Runnable接口的实例,分别表示生产者和消费者的任务。通过Lambda表达式的方式定义了它们的具体实现。
  • 用简短的语言称述上述操作就是:
    1. 缓冲区的大小为10,计数器count = 0,每次放入数组,意味着生产资料+1,计数器就会+1,同时信号量+1。而每次从数组拿出,意味着生产资料 -1,计数器也会-1,同时信号量 -1。
    2. 为了保证临界资源型问题,即多个线程去竞争冒险同一个资源区或资源而导致数据的覆盖问题,加入了互斥锁,这里是通过Lock实现,之前在进程中有通过wait和synchronized去实现锁的机制.即在进入缓冲区最多只有一个线程.

不是有计数器了,为什么还需要信号量?信号量的作用好像和计数器没什么区别.

因为这段代码的给出是一个简单的情况,仅仅只有一个生产者和消费者的情况,现实场景中往往通过信号量去表示其中的资源数,且单独分开表示更加规范,而且信号量具备阻塞的作用,计数器仅仅只能当作参考去评判,不能有下一步的动作和行为.

因此输出结果会是这样的:

Producer produced: 1
Consumer consumed: 1
....................

这个过程会持续不断的继续下去,因为缓冲区的大小是10,而生产者消费者一直是一进一出的状态,导致只用了一个槽位,而数值就是具体添加的资源.

  • 如果想加入多个线程也可以:

     Thread producerThread2 = new Thread(producer);
    

    这样就会出现以下这种情况,在缓冲区未被加满之前,消费者的消费速度明显更不上生产者,每次生产者都是生产一个资源,但因为两个生产者的缘故,且放置的位置永远不会冲突,导致消费者其实一直在不停的清理货舱

    Producer produced: 1
    Producer produced: 1
    Consumer consumed: 1
    Producer produced: 2
    Producer produced: 2
    Consumer consumed: 2
    Producer produced: 3
    Producer produced: 3
    Producer produced: 4
    Producer produced: 4
    Consumer consumed: 4
    Producer produced: 5
    Producer produced: 5
    Producer produced: 6
    Producer produced: 6
    Consumer consumed: 6
    Producer produced: 7
    Producer produced: 7
    Consumer consumed: 7 ---------在此处其实缓冲区就已经满了
    Producer produced: 8
    Consumer consumed: 8
    Producer produced: 8
    Consumer consumed: 8
    Producer produced: 9
    Consumer consumed: 9
    Producer produced: 9
    Consumer consumed: 9
    Producer produced: 10
    Consumer consumed: 10
    

    而在此之后便是生产者消费者一进一出的缘故了,无论是哪一个生产者去生产,都不可能出现,他们连续生产的情况,这是因为缓冲区已经被用完所以导致了这种情况

总结:来聊一下互斥锁,信号量,条件变量三者的关系和不同

  • 互斥锁与信号量有相似之处,当信号量的初值设置为1,且只允许其值在0和1之间变化时(在信号量时,便是通过这种方式实现).wait和signal的操作其实就是互斥锁中的lock和unlock操作类似,因此我们称这种信号量为二元信号量.他们二者之间的差别就在于:面向对象的不同,拥有锁的人被称为拥有者,而信号量更是一种同步机制,他不单独属于谁,也不绝对可能属于某一进程.因此两者在机制上有所不同.

    Lock

  • 信号量其实是一种抽象的封装,无论是底层的C语言还是java中的Semaphore,他们都基于计数器,他与条件变量的不同是,他们都提供了类似wait和signal的操作,而信号量是一种更高级的封装,他是由条件变量.互斥锁以及计数器共同实现的,因此我们查看Semaphore这个类时会发现有很多抽象的封装

    信号量

  • 条件变量:

    条件变量

为此,学习编程其实是一个自下向上的过程,如果能够理解整体的架构思想,在学习的路上就会方便和高效许多.