【课设】生产者-消费者问题算法 的设计与实现

发布时间 2023-06-12 13:38:01作者: 努力的夢泽

题目:生产者-消费者问题算法的设计与实现

目     录

1. 课题概述... 2

2. 合作分工... 2

3. 相关知识... 2

4. 系统分析... 2

5. 系统设计... 2

6. 系统运行测试界面截图... 2

7. 总结与心得体会... 2

8. 源程序清单... 2

 

1.    课题概述

1.1设计的目的和要求;

在现代软件业的发展下,互联网用户日渐增多,同一时间段会有非常的用户访问同一个服务器,用于用户并发操作的同步运行概念应运而生,同步要求在同一个服务器内面对多个用户能够执行并发操作,对服务器数据进行读取修改等操作,用户大体上可以简单概括为分为生产者和消费者,生产者负责产生数据,而消费者负责消耗数据,例如餐饮服务业中顾客和后厨之间的关系,也例如淘宝这类大型电商中,服务器是通过消息队列进行彼此之间的通讯的,消息队列和应用程序之间的架构关系也是生产消费者模型,生产者消费者问题算法可以用来解决这一模型中数据通讯操作。

学习该算法有助于我们深入了解程序在多线程下的并发操作和学习多线程中并发、同步相关的知识,巩固对操作系统原理的理解。上机操作加深我们学生对知识点的掌握程度和锻炼业务技能。

1.2开发环境。

操作系统:Windows 10 家庭中文版

JDK:1.8.0_171

编程实现软件:Eclipse Java EE IDE  Eclipse版本:Mars Release (4.5.0)

2.    合作分工

姓名

具体工作内容

我自己

(1)全部内容

(2)

 

(1)

(2)

 3.    相关知识

3.1进程同步机制的概念、准则

概念:在计算机中,有些资源是独占的,一次只能一个进程进行使用。在进程同步中,由于采用了多道程序设计,内存中多道程序处于并发执行状态,使得多个程序的执行结果可能会不可再现,程序这个概念已经无法描绘程序的并发执行,于是引入进程这个概念。

进程同步便是对多个相关进程在执行次序上的协调,使其并发执行的进程之间能够按照一定的规则共享系统资源,并且很好的互相合作,从而使进程的执行结果具有可再现性。

准则:

  • 空闲让进:当没有进程处于临界区,可允许一个请求进入临界区的进程立即进入自己的临界区
  • 忙则等待:当已经有进程进入了自己的临界区,其他所有企图进入临界区的进程碧玺等待
  • 有限等待:对请求访问临界资源的进程,应当保证进程在一定时间内进入自己的临界区
  • 让权等待:当进程不能进去自己的临界区,应当立即释放处理机

3.2有界缓冲池、线程的概念

  • 有界缓冲池:为了加快数据的访问将需要访问的数据放在缓冲池中,在生产者消费者问题中的有界缓冲池表示的是数据存放的地方。对于生产者,首先判断缓冲池中是否有空的缓冲区,有则再判断缓冲池是否可用,如可用则锁住缓冲区,生产一个数据并放入缓冲区中;对于消费者说下判断缓冲池中是否有有资源的缓冲区,如有则判断缓冲池是否可用,如可用则锁住缓冲区,从缓冲池中选取一个缓冲区拿取数据。有界缓冲池是指缓冲池中的缓冲区数量是有限的,对于生产者若有界缓冲池中没有可用的缓冲区即缓冲池满了即阻塞,对于消费者若缓冲池中没有可用资源即缓冲区中位空即阻塞。
  • 线程:进程是操作系统中进程保护、并发执行和资源分配的基本单位,操作系统分配资源以进程为基本单位。而线程是进程的组成部分,它代表了一条顺序的执行流。线程只拥有一点在运行中必不可省的资源。线程定义为进程内一个执行单元或一个可调度实体。

3.3线程与进程的关系

  • 进程即是一个拥有资源的独立单位,它可以独立分配地址空间,贮存和其他,又是一个可独立调度和分派的基本单位,一个进程内有多个线程。
  • 线程是进程的一部分,线程只拥有一点在运行中必不可省的资源。线程定义为进程内一个执行单元或一个可调度实体。线程占用资源少,使得产生、终止、切换线程比进程的花费少得多,线程机制也增加了通讯的有效性。线程之间的通讯时在同一个进程的地址空间内,共享主存和文件,所以操作简单。

3.4信号量机制的原理

  • 信号量机制是一种有效的进程互斥同步工具。进程的互斥通过对信号量的操作来完成。
  • 记录型信号量结构:在此结构中信号量是代表资源物理实体的数据结构,且信号量的值只能同构两个原子操作P、V操作来实现,分别代表申请资源和释放资源
  • 信号量的类型:
    • 公用/互斥信号量:一组为需互斥共享临界资源的并发进程设置,代表永久性的共享临界资源,每个进程均可对它世家P、V操作。
    • 专用/同步信号量:一组需同步协作完成任务并发进程设置,代表消耗性的专用资源,只有使用该资源的进程才能对其施加P、V操作。
  • 信号量取值意义:
    • Value > 0 :表示可供使用资源数
    • Value = 0 : 表示资源已经被占用,没有其他进程在等待
    • Value < 0 : 表示资源已被占用,还有n个进程在因等待资源而阻塞
  • 信号量机制实现进程同步所需遵循规则,并发执行进程为C、P:
    • 只有当进程C把数据送入Buffer后,P进程才能从Buffer中取出数据,否则进程P只能等待。
    • 只有当P进程从Buffer取走数据后,C进程才能将新产生的数据再存入道Buffer中去,否则C进程只能等待。

 4.    系统分析

4.1生产者-消费者问题原理

1)      生产者-消费者问题是最著名的同步问题,用以演示信号量机制运行。描述的是有一组生产者[P1,P2,P3…],一组消费者[C1,C2,C3…],他们共享一个有界缓冲池,生产者向缓冲池中存放数据,消费者从缓冲池中拿取数据,生产者-消费者问题是许多需要互相合作进程的一种抽象。过程如图所示:

 

作图 1 生产者-消费者 

2)      进程同步原理:假设缓冲池中有n个缓冲区,每个缓冲区只能存方一个数据。由于缓冲池是临界资源,只能允许一个进程对其进程操作也就是说只能允许一个生产者存放资源或者一个消费者拿取数据。这说明生产者之间、生产者和消费者之间、消费者之间互斥使用缓冲池。故设置互斥信号量mutex,代表缓冲池资源,初值1;伪代码如图:

 

作图 2 生产者消费者伪代码 

3)      这样我们就实现了进程间对临界资源的互斥访问,但由于缓冲池是有界的,所以我们还需要对缓冲池中的缓冲区中具有资源进行判断。首先生产者和消费者之间需要满足以下两个同步条件:

  • 只有在缓冲池中至少有一个缓冲区已存入消息后,消费者才能从中提取消息,否则消费者必须等待。
  • 只有缓冲池中至少有一个缓冲区是空时,生产者才能把消息放入缓冲区,否则生产者必须等待。

4)      对于第一个同步条件,设置信号量full,代表缓冲池中有资源的缓冲区的数目,对于第二个同步条件,设置信号量empty,代表缓冲池中没有资源的空缓冲区的数目。

  • 对于生产者进程在存放数据之前需要申请资源,即是否empty>0,若是则可以继续申请互斥信号量mutex,如果否的话说明当前缓冲池中没有空的缓冲区,即生产满了需要阻塞。
  • 对于消费者进程在消费数据之前需要申请资源,即是否full>0,若是则可以继续申请互斥信号量判断是否可以访问缓冲池,若为否则说明缓冲池中没有带有数据的缓冲区,此时需要消费者阻塞。

 

作图 3 生产消费者并发执行伪代码 

5.    系统设计

5.1系统中主要数据结构介绍

算法分为生产者类、消费者类、缓冲buffer类

生产者类继承线程Thread,生产者类带有一个buffer是与其他进程共享的缓冲类。在生产者类的run(即线程启动)函数中,使用while(true)无限次循环方法体,方法体中是调用buffer的add()函数,代表对缓冲池进行生产数据并存放操作,具体的信号量的判断全部都设置道缓冲池buffer类中,随后线程休眠。这样的设计目的是简化代码,生产者和消费者都是如此设计,提升代码简洁度和可读性。

消费者类与生产者类如出一辙,带有缓冲池buffer类与其他进程共享,在run()函数中调用buffer类的poll()方法,代表对缓冲池进行消费操作,具体的信号量判断在buffer中进行。

Buffer缓冲池类,带有用户输入的最大缓冲区数量MAX,缓冲区数组resource[],空的缓冲区数量empty=MAX,带有资源的缓冲区数量full=0,代表互斥信号量mutex,以及代表以及生产了第几个资源nElems。在构造方法中对buffer类初始化,resource容量大小设置为MAX+1,方便计算与直观表达。在Buffer类中对add() 、poll()方法使用关键字synchronized进行加锁,synchronized是一种同步锁,是把该方法与其他调用此方法的进程之间是互斥关系,当有一个进程成功调用了该方法即上锁,其他进程企图调用该方法会阻塞。也就是说,add() 、poll()方法作为原子操作,构建了一个互斥使用的临界资源,一次只能由一个进程进行add() 、poll()操作,相当于只能由一个进程进入缓冲池。

5.2程序流程图及说明

 

作图 4 程序执行流程

 

作图 5 生产者进行生产操作流程【即调用add()】

 

 

作图 6 消费者进行消费操作【即调用poll(0】

 6.    系统运行测试界面截图

6.1用户输入第1组初始化数据

输入数据顺序为: 生产者数目、消费者数目、缓冲区数目:

作图 7

 

预期结果为,在一般情况下运行一段时间后,由于生产者数量多而出现阻塞,但又由于缓冲区数量多于生产者数量所以出现生产者阻塞会比较晚,且有可能在程序开始时就启动了消费者进程造成次奥非洲阻塞。

6.2第1组数据输出情况截图

可以看到除去一开始调用到了消费者进程导致消费者进行阻塞外,在运行了一段时候后才出现了生产者进程的阻塞,且后续出现消费者阻塞的概率大大降低。

作图 8

 

6.3用户输入第2组初始化数据

输入数据顺序为: 生产者数目、消费者数目、缓冲区数目:

作图 9

 

预期结果:生产者数量接近缓冲区数量,所以生产者出现阻塞的该路会更大,而对于消费者来说除了在程序开始运行的时候调用了消费者进程导致的阻塞外,其他情况下阻塞的概率很低。

6.4第2组数据输出情况截图

可以看到在程序运行一段时间后生产者出现阻塞概率很更大,而消费者在后面几乎不会出现阻塞。

作图 10

 

6.5用户输入第3组初始化数据

输入数据顺序为: 生产者数目、消费者数目、缓冲区数目:

作图 11

 

消费者数量接近缓冲区数量,预期消费者进程会多次出现阻塞,而对于生产者来说出现阻塞的概率很低。

6.6第3组数据输出情况截图

可以看到消费者出现多次阻塞,而对于生产者来说出现阻塞的概率很小。

作图 12

 

6.7其他界面。

作图 13

 7.    总结与心得体会

7.1 深刻学习了在计算机中进程并发执行的机制,学习了其中运用的信号量机制。

7.2 掌握了编程实现了多线程并发操作,学习了程序同步相关的知识。

7.3 在创建生产者和消费者进程时同时传入进程的编号,在输出进程操作时显示进程编号,这样会更加直观地反应出各个线程的运行状态和了解进程同步的运行。

8.    源程序清单

  1 public class Test01 {
  2 
  3       System.out.println("----------生产者:"+ a + " 消费者: "+b +" 缓冲区:"+max + "----------");
  4 
  5       Buffer buffer = new Buffer(max);
  6 
  7       for (int i = 0; i < a; i++) {
  8 
  9          new Producer(buffer,i).start();//创建进程
 10 
 11       }
 12 
 13       for (int i = 0; i < b; i++) {
 14 
 15          new Consumer(buffer,i).start();
 16 
 17       }
 18 
 19 }
 20 
 21 class Producer extends Thread{
 22 
 23    private  boolean mutex;
 24 
 25    private Buffer buffer;//缓冲区
 26 
 27    Random rand = new Random();
 28 
 29    public Producer(Buffer buffer,int i) {
 30 
 31       this.buffer = buffer;
 32 
 33       this.i = i;
 34 
 35    }
 36 
 37    @Override
 38 
 39    public void run() {
 40 
 41       super.run();
 42 
 43       while(true){
 44 
 45          try {
 46 
 47             buffer.add(i);//生产
 48 
 49             Thread.sleep((long)rand.nextInt(1000 - 1 + 1) + 1);
 50 
 51          } catch (InterruptedException e) {      
 52 
 53             e.printStackTrace();
 54 
 55          }
 56 
 57       }
 58 
 59    }
 60 
 61 }
 62 
 63 class Consumer extends Thread{
 64 
 65    private  boolean mutex;
 66 
 67    private Buffer buffer;
 68 
 69    Random rand = new Random();
 70 
 71    public Consumer(Buffer buffer,int i) {
 72 
 73       this.buffer = buffer;this.i=I;
 74 
 75    }
 76 
 77    @Override
 78 
 79    public void run() {
 80 
 81       super.run();
 82 
 83       while(true){
 84 
 85          try {
 86 
 87             buffer.poll(i);//消费
 88 
 89             Thread.sleep((long)rand.nextInt(1000 - 1 + 1) + 1);
 90 
 91          } catch (InterruptedException e) {
 92 
 93             e.printStackTrace();
 94 
 95          }
 96 
 97       }
 98 
 99    }
100 
101 }
102 
103 class Buffer extends Thread{
104 
105    public synchronized void add(int i){//加锁,方法是生产者和消费者共有的,保障并发执行
106 
107       try {
108 
109          if(empty != 0){//有空盒子
110 
111             if(!mutex){//可用
112 
113                int pos = findEmpty();//随机找到一个可用的位置
114 
115                nElems++;
116 
117                empty--;//P(empty)
118 
119                full++;  //V(full)
120 
121                resource[pos] = nElems;
122 
123                System.out.println("生产者#” +i+”生产了第 "+nElems+" 个面包放入第 "+pos+" 个位置。");
124 
125                Thread.sleep((long)rand.nextInt(2000 - MIN + 1) + MIN);
126 
127                notify();//唤醒其他进程
128 
129             }else{
130 
131                wait();//阻塞
132 
133             }
134 
135          }else{
136 
137             System.out.println("生产者#”+i+”阻塞!");
138 
139             wait();//阻塞
140 
141          }
142 
143       } catch (InterruptedException e) {
144 
145          e.printStackTrace();
146 
147       }
148 
149    }
150 
151    public synchronized void poll(){
152 
153       try {
154 
155          //synchronized(resource){
156 
157          if(full != 0){//有面包
158 
159             if(!mutex){
160 
161                int pos = findFull();
162 
163                empty++;//V(empty)
164 
165                full--;//V(full)
166 
167                System.out.println("消费者#”+i+”消费了第 "+resource[pos]+" 个面包放回第 "+pos+" 个空盒子。");
168 
169                resource[pos] = 0;//
170 
171                Thread.sleep((long)rand.nextInt(2000 - MIN + 1) + MIN);
172 
173                notify();
174 
175             }else{
176 
177                wait();
178 
179             }
180 
181          }else{
182 
183             System.out.println("消费者#”+i+”阻塞!");
184 
185             wait();
186 
187          }
188 
189       } catch (InterruptedException e) {
190 
191          e.printStackTrace();
192 
193       }
194 
195    }
196 
197 }