多线程|生产者消费模型

发布时间 2023-09-04 18:50:39作者: 司丝思

在正式介绍生产者消费者模型之前,我们先来认识一下阻塞队列。

阻塞队列是特殊的队列,是在先进先出的基础上加了一些特殊的功能:

1)如果队列为空,线程要执行出队操作时,就会进入阻塞,阻塞直到另一个线程往队列里添加元素;

2)如果队列满了,线程要进行入队操作时,就会进入阻塞,直到有另一个线程从队列里取走元素。

生产者消费者模型

生产者消费者模型是什么?

生产者消费者之间不直接通讯,而是通过阻塞队列来进行通讯。生产者产出数据放入阻塞队列中,消费者需要数据则从阻塞队列中获取。生产者消费者模型有两大好处:

1)降低耦合

举个开发中典型的场景:服务器之间的相互调用,

 从游戏客户端向A服务器发送充值请求,A服务器接到充值请求之后,将计算金额的任务转发给B服务器,B服务器处理好之后将计算好的金额返回给A服务器,A服务器经处理后将充值结果返回给客户端程序。上述过程中,A服务器和B服务器之间的交互可以视为是A调用了B,此时,A和B之间的耦合是很高的,也就是关联性非常高,如果B服务器挂了,很容易引起A服务器的bug。使用生产者消费者就可以很好的解决上述问题。

 在A服务器和B服务器之间引入阻塞队列,A将请求放入阻塞队列1中,B服务器从阻塞队列1中拿去元素,处理好之后放入阻塞队列2中,A服务器从阻塞队列2中获取元素。此时A与B之间的耦合就降低了,B服务器如果挂了,不会引起A服务器的bug,A服务器只需要将元素放入阻塞队列1中,如果队列满了就进入阻塞,同样的,如果要从队列2中获取元素,如果队列为空,也阻塞等待。经过这样的处理之后,任何一个服务器挂了都不会引起另一个服务器的bug。

2)“削峰填谷”

生产者消费者模型里的阻塞队列可以认为是一个缓冲区,当一个服务器收到大量的请求,处理大量的请求服务器可能会扛不住,此时,可以将这些请求放入一个阻塞队列中,让“消费者”慢慢来处理这些请求。

代码实现阻塞队列

我们基于链表来实现一个阻塞队列,主要实现入队列和出队列功能。

 我们使用tail来标记入队列时的下标记录,放入新的数据,tail++;当有数据出队列时,head执行“++”操作,那么在[head,tail)区间都是有数据的。这里我们引入size来记录个数,size等于队列长度则为满,否则不为满。

代码如下:

class MyBlockingQueue{
    //设置变量
    public int[] item = new int[1000];
    public int head = 0;
    public int tail = 0;
    public int size = 0;
    //入队列
    public void put(int value) throws InterruptedException{
        //判断队列是否为满,为满进入阻塞,直到队列有元素被取走
        synchronized (this){
            if(size == item.length){
                //进入阻塞,这里使用wait
                this.wait();
            }
            item[tail] = value;
            tail++;
            size++;
            //数组长度有限,如何利用有限的长度放入无限的元素,这里设置当tail的值大于等于数组长度的时候,将tail置为0.
            if(tail >= item.length){
                tail = 0;
            }
            //唤醒take中的wait
            this.notify();
        }
    }

    //出队列
    public int take() throws InterruptedException{
        int res = 0;
        synchronized (this){
            //判断是否为空,为空则阻塞,这里同样也使用wait
            if(size == 0){
                this.wait();
            }
            res = item[head];
            head++;
            size--;
            //与tail一样,head在大于等于数组长度的时候要置为0.
            if(head >= item.length){
                head = 0;
            }
            //唤醒put中的wait
            this.notify();
        }
        return res;
    }