美团一面:如何实现一个100W ops 生产者消费者程序?

发布时间 2023-04-28 17:04:21作者: 疯狂创客圈

说在前面

在40岁老架构师 尼恩的读者社群(50+)中,最近有小伙伴拿到了一线互联网企业如极兔、有赞、希音、百度、网易的面试资格,遇到一几个很重要的面试题:

如何设计一个100W ops 生产者、消费者程序?

与之类似的、其他小伙伴遇到过的问题还有:

手写一个 生产者、消费者程序?

设计一个 高性能的 生产者、消费者程序?

这里尼恩给大家做一下系统化、体系化的线程池梳理,使得大家可以充分展示一下大家雄厚的 “技术肌肉”,让面试官爱到 “不能自已、口水直流”

也一并把这个题目以及参考答案,收入咱们的 《尼恩Java面试宝典 PDF》V61版本,供后面的小伙伴参考,提升大家的 3高 架构、设计、开发水平。

注:本文以 PDF 持续更新,最新尼恩 架构笔记、面试题 的PDF文件,请从这里获取:码云或者语雀

最佳答案至少要包括以下5个版本:

  • 版本1:不安全的生产者-消费者模式版本
  • 版本2:使用 内置锁实现的 生产者-消费者模式版本
  • 版本3:使用信号量实现(Semaphore)
  • 版本4:使用Blockingqueue 实现
  • 版本5:无锁实现生产者-消费者模式版本

什么是生产者-消费者模式

首先,来看什么是生产者-消费者问题?

生产者-消费者问题(Producer-Consumer Problem)也称有限缓冲问题(Bounded-Buffer Problem),是一个多线程同步问题的经典案例。

生产者-消费者问题描述了两个访问共享缓冲区的线程,即生产者线程和消费者线程,在实际运行时会发生的问题。生产者线程的主要功能是生成一定量的数据放到缓冲区中,然后重复此过程。消费者线程的主要功能是从缓冲区提取(或消耗)数据。

生产者―消费者问题关键是:

1)保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中为空时消耗数据。

2)保证在生产者加入过程、消费者消耗过程中,不会产生错误的数据和行为。

生产者-消费者问题不仅仅是一个多线程同步问题的经典案例,而且业内已经将解决该问题的方案,抽象成为了一种设计模式——“生产者-消费者”模式。

现在,来看看什么是生产者-消费者模式?

生产者-消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。

在生产者-消费者模式中,通常由两类线程,即生产者线程(若干个)和消费者线程(若干个)。生产者线程向数据缓冲区(DataBuffer)加入数据,消费者线程则从DataBuffer消耗数据。生产者和消费者、内存缓冲区之间的关系结构图如下:

生产者-消费者模式中,有4个关键点:

(1)生产者与生产者之间、消费者与消费者之间,对数据缓冲区的操作是并发进行的。

(2)数据缓冲区是有容量上限的。数据缓冲区满后,生产者不能再加入数据;DataBuffer空时,消费者不能再取出数据。

(3)数据缓冲区是线程安全的。在并发操作数据区的过程中,不能出现数据不一致情况;或者在多个线程并发更改共享数据后,不会造成出现脏数据的情况。

(4)生产者或者消费者线程在空闲时,需要尽可能阻塞而不是执行无效的空操作,尽量节约CPU资源。

面试题:如何实现一个100W ops 生产者、消费者程序?

尼恩提示,遇到这样的面试题,我们可以从基础的版本开始,一步一步进行性能优化。

  • 版本1:不安全的生产者-消费者模式版本
  • 版本2:使用 内置锁实现的 生产者-消费者模式版本
    顺便说说,锁的代价
  • 版本3:使用 内置锁实现的 生产者-消费者模式版本

版本1:不安全的生产者-消费者模式版本

根据生产者―消费者模式的结构图和描述先来实现一个非线程安全版本,包含了:

  • 数据缓冲区(DataBuffer)类、
  • 生产者(Producer)类、
  • 消费者(Consumer)类。

首先定义其数据缓冲区类,具体的代码如下:

//共享数据区,类定义
class NotSafeDataBuffer<T> {
    public static final int MAX_AMOUNT = 10;
    //保存具体数据元素
    private List<T> dataList = new LinkedList<>();

    //保存元素数量
    private AtomicInteger amount = new AtomicInteger(0);

    /**
     * 向数据区增加一个元素
     */
    public void add(T element) throws Exception {
        if (amount.get() > MAX_AMOUNT) {
            Print.tcfo("队列已经满了!");
            return;
        }
        dataList.add(element);
        Print.tcfo(element + "");
        amount.incrementAndGet();

        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
    }

    /**
     * 从数据区取出一个元素
     */
    public T fetch() throws Exception {
        if (amount.get() <= 0) {
            Print.tcfo("队列已经空了!");
            return null;
        }
        T element = dataList.remove(0);
        Print.tcfo(element + "");
        amount.decrementAndGet();
        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
        return element;
    }
}

上面的代码:

  • 在add()实例方法中,加入元素之前首先会对amount是否达到上限进行判断,如果数据区满了,则不能加入数据;
  • 在fetch()实例方法中,消耗元素前首先会对amount是否大于零进行判断,如果数据区空了,就不能取出数据。

生产者-消费者模式中,数据缓冲区(DataBuffer)类以及相应的生产、消费动作(Action)是可变的,生产者类、消费者类的执行逻辑是不同的,

那本着“分离变与不变”的软件设计基本原则,可以将生产者类、消费者类与具体的生产、消费Action解耦,

从而使得生产者类、消费者类的代码在后续可以复用,生产者、消费者逻辑与对应Action解耦后的类结构图如下:

通用Producer类组合了一个Callable类型的成员action实例,代表了生产数据所需要执行的实际动作,需要在构造Producer实例时传入。

通用生产者类的代码具体如下:

/**
 * 生产者任务的定义
 * Created by 尼恩@疯狂创客圈.  源码来自 《Java高并发核心编程 卷2 加强版》
 */
public class Producer implements Runnable {
    //生产的时间间隔,产一次等待的时间,默认为200ms
    public static final int PRODUCE_GAP = 200;

    //总次数
    // 注意:
    // 不是单个的次数
    // 是所有生产者的总的生产次数
    static final AtomicInteger TURN = new AtomicInteger(0);

    //生产者对象编号
    static final AtomicInteger PRODUCER_NO = new AtomicInteger(1);

    //生产者名称
    String name = null;

    //生产的动作
    Callable action = null;

    int gap = PRODUCE_GAP;

    public Producer(Callable action, int gap) {
        this.action = action;
        this.gap = gap;
        if (this.gap <= 0) {
            this.gap = PRODUCE_GAP;
        }
        name = "生产者-" + PRODUCER_NO.incrementAndGet();

    }

    public Producer(Callable action) {
        this.action = action;
        this.gap = PRODUCE_GAP;
        name = "生产者-" + PRODUCER_NO.incrementAndGet();

    }

    @Override
    public void run() {
        while (true) {

            try {
                //执行生产动作
                Object out = action.call();
                //输出生产的结果
                if (null != out) {
                    Print.tcfo("第" + TURN.get() + "轮生产:" + out);
                }
                //每一轮生产之后,稍微等待一下
                sleepMilliSeconds(gap);

                //增加生产轮次
                TURN.incrementAndGet();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

通用Consumer类也组合了一个Callable类型的成员action实例,代表了消费者所需要执行的实际消耗动作,需要在构造Consumer实例时传入。

通用Consumer类的代码具体如下:

/**
 * 消费者任务的定义
 * Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》
 */
public class Consumer implements Runnable {

    //消费的时间间隔,默认等待100毫秒
    public static final int CONSUME_GAP = 100;


    //消费总次数
    // 注意:
    // 不是单个消费者的次数
    // 是所有消费者的总的消费次数
    static final AtomicInteger TURN = new AtomicInteger(0);

    //消费者对象编号
    static final AtomicInteger CONSUMER_NO = new AtomicInteger(1);

    //消费者名称
    String name;

    //消费的动作
    Callable action = null;

    //消费一次等待的时间,默认为1000ms
    int gap = CONSUME_GAP;

    public Consumer(Callable action, int gap) {
        this.action = action;
        this.gap = gap;
        name = "消费者-" + CONSUMER_NO.incrementAndGet();

    }

    public Consumer(Callable action) {
        this.action = action;
        this.gap = gap;
        this.gap = CONSUME_GAP;
        name = "消费者-" + CONSUMER_NO.incrementAndGet();
    }

    @Override
    public void run() {
        while (true) {
            //增加消费次数
            TURN.incrementAndGet();
            try {
                //执行消费动作
                Object out = action.call();
                if (null != out) {
                    Print.tcfo("第" + TURN.get() + "轮消费:" + out);
                }
                //每一轮消费之后,稍微等待一下
                sleepMilliSeconds(gap);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

在完成了数据缓冲区类的定义、生产者类定义、消费者类的定义之后,

接下来定义一下数据缓冲区实例、生产动作和消费动作,具体的代码如下:

// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

public class NotSafePetStore {
    //共享数据区,实例对象
    private static NotSafeDataBuffer<IGoods> notSafeDataBuffer = new NotSafeDataBuffer();

    //生产者执行的动作
    static Callable<IGoods> produceAction = () ->
    {
        //首先生成一个随机的商品
        IGoods goods = Goods.produceOne();
        //将商品加上共享数据区
        try {
            notSafeDataBuffer.add(goods);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return goods;
    };
    //消费者执行的动作
    static Callable<IGoods> consumerAction = () ->
    {
        // 从PetStore获取商品
        IGoods goods = null;
        try {
            goods = notSafeDataBuffer.fetch();

        } catch (Exception e) {
            e.printStackTrace();
        }
        return goods;
    };
}

利用以上NotSafePetStore类所定义的三个静态成员,可以快速组装出一个简单的生产者-消费者模式的Java实现版本,具体的代码如下:


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

public static void main(String[] args) throws InterruptedException {
    System.setErr(System.out);

    // 同时并发执行的线程数
    final int THREAD_TOTAL = 20;
    //线程池,用于多线程模拟测试
    ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_TOTAL);
    for (int i = 0; i < 5; i++) {
        //生产者线程每生产一个商品,间隔500ms
        threadPool.submit(new Producer(produceAction, 500));
        //消费者线程每消费一个商品,间隔1500ms
        threadPool.submit(new Consumer(consumerAction, 1500));
    }
}

在NotSafePetStore的main()方法中,利用for循环向线程池提交了5个生产者线程和5个消费者实例。

每个生产者实例生产一个商品间隔500毫秒;消费者实例每消费一个商品间隔1500毫秒;

也就是说,生产的速度大于消费的速度。

执行结果如下:

从以上异常可以看出,在向数据缓冲区进行元素的增加或者提取时,多个线程在并发执行对amount、dataList两个成员操作时次序已经混乱,导致了数据不一致和线程安全问题。

版本2:使用 内置锁实现的 生产者-消费者模式版本

前面说了,为了实现 线程安全的生产者-消费者模式实现,可以使用锁来实现。

  • 比较低级的办法是用内置锁,也就是synchronized+wait+notify方式解决
  • 更加高级一点的版本,使用显示锁 比如信号量(Semaphore)、Blockingqueue
  • 当然,终极版本是无锁编程 disruptor 的方式来解决

当然,咱们这个小节,仅仅关注有锁版本,后面的小节,再无锁编程。

咱们就挨个来看下。先看 内置锁实现的 生产者-消费者模式版本

synchronized+wait()+notify() 的实现方式

使用synchronized解决生产者和消费者模式,首先我们需要找出临界区资源和临界区代码块。

首先,我们来看下什么是临界区资源。临界区资源表示一种可以被多个线程使用的公共资源或共享数据,但是每一次只能有一个线程使用它。一旦临界区资源被占用,想使用该资源的其他线程则必须等待。在并发情况下,临界区资源是受保护的对象。

接下来,我们再来看下什么是临界区代码块。临界区代码段(Critical Section)是每个线程中访问临界资源的那段代码,多个线程必须互斥地对临界区资源进行访问。线程进入临界区代码段之前,必须在进入区申请资源,申请成功之后进行临界区代码段,执行完成之后释放资源。临界区代码段的进入和退出如下:

最后,我们来看下竟态条件(Race Conditions)可能是由于在访问临界区代码段时没有互斥地访问而导致的特殊情况。

如果多个线程在临界区代码段的并发执行结果可能因为代码的执行顺序不同而出现不同的结果,我们就说这时在临界区出现了竞态条件问题。

那咱们回过头来看生产者-消费者模式, 这个模式中, 生产者和消费者都需要操作DataBuffer(数据缓冲区)中,可以知道,临界区代码段在DataBuffer(数据缓冲区)中。

在数据缓冲区中,主要是数据进行操作, 那么 由两个临界区资源,分别是amount和dataList。

由生产者-消费者模式的关键点我们可知, 生产者与生产者之间、消费者与消费者之间,对数据缓冲区的操作是并发进行的。

那么添加数据和消耗数据是临界区代码,即其add()和fetch()两个方法。

那么创建建一个安全的数据缓存区类SafeDataBuffer类,在其add()和fetch()两个实例方法的public声明后面加上synchronized关键字即可。那线程安全的SafeDataBuffer类代码如下:

// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

//共享数据区,类定义
class SafeDataBuffer<T> {
    public static final int MAX_AMOUNT = 10;
    private List<T> dataList = new LinkedList<>();

    //保存数量
    private AtomicInteger amount = new AtomicInteger(0);

    /**
     * 向数据区增加一个元素
     */
    public synchronized void add(T element) throws Exception {
        if (amount.get() > MAX_AMOUNT) {
            Print.tcfo("队列已经满了!");
            return;
        }
        dataList.add(element);
        Print.tcfo(element + "");
        amount.incrementAndGet();

        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
    }

    /**
     * 从数据区取出一个元素
     */
    public synchronized T fetch() throws Exception {
        if (amount.get() <= 0) {
            Print.tcfo("队列已经空了!");
            return null;
        }
        T element = dataList.remove(0);
        Print.tcfo(element + "");
        amount.decrementAndGet();
        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
        return element;
    }
}

由于其他的代码没有发生变化,我们执行看下结果:

运行这个线程安全的生产者-消费者模式的实现版本,

等待一段时间,之前出现的amount数量和dataList的长度不相等的受检异常没有再抛出;

另外,之前出现的数据不一致情况以及线程安全问题也被完全解除。

目前的SafeDataBuffer类中,还存在一个性能的问题:消费者每一轮消费,不管数据区是否为空,都需要进行数据区的询问和判断。

循环的代码如下:


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

/**
 * 从数据区取出一个元素
 */
public synchronized T fetch() throws Exception {
    if (amount.get() <= 0) {
        Print.tcfo("队列已经空了!");
        return null;
    }
    ....
}

当数据区空时(amount <= 0),消费者无法取出数据,但是仍然做一个无用的数据区询问工作,白白耗费了CPU的时间片

对于生产者来说,也存在类似的无效轮询问题。

当数据区满时,生产者无法加入数据,这时候生产者执行add(T element)方法也白白耗费了CPU的时间片。


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

/**
 * 向数据区增加一个元素
 */
public synchronized void add(T element) throws Exception {
    if (amount.get() > MAX_AMOUNT) {
        Print.tcfo("队列已经满了!");
        return;
    }
    ....
}

在生产者或者消费者空闲时节约CPU时间片,免去巨大的CPU资源浪费的方法是使用“等待-通知”方式进行生产者与消费者之间的线程通信。

具体实现:

(1)在数据区满(amount.get() > MAX_AMOUNT)时,可以让生产者等待,等到下次数据区中可以加入数据时,给生产者发通知,让生产者唤醒。

(2)在数据区空(amount <= 0)时,可以让消费者等待,等到下次数据区中可以取出数据时,消费者才能被唤醒。

(3)可以在消费者取出一个数据后,由消费者去唤醒等待的生产者。

(4)可以在生产者加入一个数据后,由生产者去唤醒等待的消费者。

Java语言中“等待-通知”方式的线程间的通信使用对象的wait()、notify()两类方法来实现。

每个Java对象都有wait()、notify()两类实例方法,并且wait()、notify()方法和对象的监视器是紧密相关的。

Java对象中的wait()、notify()两类方法就如同信号开关,用来进行等待方和通知方之间的交互。

对象的wait()方法的主要作用是让当前线程阻塞并等待被唤醒。wait()方法与对象监视器紧密相关,使用wait()方法时也一定需要放在同步块中。

wait()方法的调用方法如下:


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

synchronized(locko)
{
    //同步保护的代码块
    locko.wait();
        ...
}

对象的notify()方法的主要作用是唤醒在等待的线程。notify()方法与对象监视器紧密相关,使用notify()方法时也需要放在同步块中。notify()方法的调用方法如下:


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

synchronized(locko)
{
    //同步保护的代码块
    locko.notify();
        ...
}

为了避免空轮询导致CPU时间片浪费,提高生产者-消费者实现版本的性能,接下来演示使用“等待-通知”的方式在生产者与消费者之间进行线程间通信。

使用“等待-通知”机制通信的生产者-消费者实现版本定义三个同步对象,具体如下:

(1)LOCK_OBJECT:用于临界区同步,临界区资源为数据缓冲区的dataList变量和amount 变量。

(2)NOT_FULL:用于数据缓冲区的未满条件等待和通知。生产者在添加元素前,需要判断数据区是否已满,如果是,生产者进入NOT_FULL的同步区去等待被通知,只要消费者消耗一个元素,数据区就是未满的,进入NOT_FULL的同步区发送通知。

(3)NOT_EMPTY:用于数据缓冲区的非空条件等待和通知。消费者在消耗元素前需要判断数据区是否已空,如果是,消费者进入NOT_EMPTY的同步区等待被通知,只要生产者添加一个元素,数据区就是非空的,生产者会进入NOT_EMPTY的同步区发送通知。

具体代码如下:


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

public class CommunicatePetStore {

    public static final int MAX_AMOUNT = 10; //数据区长度

    //共享数据区,类定义
    static class DateBuffer<T> {
        //保存数据
        private List<T> dataList = new LinkedList<>();
        //保存数量
        private volatile int amount = 0;

        private final Object LOCK_OBJECT = new Object();
        private final Object NOT_FULL = new Object();
        private final Object NOT_EMPTY = new Object();

        // 向数据区增加一个元素
        public void add(T element) throws Exception {
            synchronized (NOT_FULL) {
                while (amount >= MAX_AMOUNT) {
                    Print.tcfo("队列已经满了!");
                    //等待未满通知
                    NOT_FULL.wait();
                }
            }
            synchronized (LOCK_OBJECT) {

                if (amount < MAX_AMOUNT) { // 加上双重检查,模拟双检锁在单例模式中应用
                    dataList.add(element);
                    amount++;
                }
            }
            synchronized (NOT_EMPTY) {
                //发送未空通知
                NOT_EMPTY.notify();
            }
        }

        /**
         * 从数据区取出一个商品
         */
        public T fetch() throws Exception {
            synchronized (NOT_EMPTY) {
                while (amount <= 0) {
                    Print.tcfo("队列已经空了!");
                    //等待未空通知
                    NOT_EMPTY.wait();
                }
            }

            T element = null;
            synchronized (LOCK_OBJECT) {
                if (amount > 0) {  // 加上双重检查,模拟双检锁在单例模式中应用
                    element = dataList.remove(0);
                    amount--;
                }
            }

            synchronized (NOT_FULL) {
                //发送未满通知
                NOT_FULL.notify();
            }
            return element;
        }
    }
}

那以上就是使用synchronized+wait+notify实现的线程安全的生产者-消费者模式。

虽然线程安全问题顺利解决,但是以上的解决方式使用了SafeDataBuffer的实例的对象锁作为同步锁,这样一来,所有的生产、消费动作在执行过程中都需要抢占同一个同步锁,最终的结果是所有的生产、消费动作都被串行化了。

而且在锁竞争激烈的情况下,synchronized锁会膨胀升级为重量级锁,严重的影响的程序的性能。

尼恩提示:

synchronized锁的膨胀底层原理,非常重要, 这部分内容可以阅读 《Java高并发核心编程 卷2 加强版》。

这里不做赘述。

版本3:使用信号量实现(Semaphore)

为了实现 线程安全的生产者-消费者模式实现,可以使用锁来实现。

  • 比较低级的办法是用内置锁,也就是synchronized+wait+notify方式解决
  • 更加高级一点的版本,使用显示锁 比如信号量(Semaphore)、Blockingqueue
  • 当然,终极版本是无锁编程 disruptor 的方式来解决,

当然,咱们这个小节,仅仅关注有锁版本,后面的小节,再无锁编程。

咱们就挨个来看下。那接下我们看下显示锁 的 核心成员之一 信号量(semaphore)实现线程安全的生产者-消费者模式。

什么是信号量?

信号量是Dijkstra在1965年提出的一种方法,它使用一个整型变量来累计唤醒次数,供以后使用。

在他的建议中引入了一个新的变量类型,称作信号量(semaphore)。

一个信号量的取值可以为0(表示没有保存下来的唤醒操作)或者正值(表示有一个或多个唤醒操作)。

Dijkstra建议设立两种操作:down和up(分别为一般化后的sleep和wakeup)。

对一个信号量执行down操作,则是检查其值是否大于0。若该值大于0,则将其减1(即用掉一个保存的唤醒信号)并继续;若该值为0,则进程将睡眠,而且此时down操作并未结束。

原子操作:所谓原子操作,是指一组相关联的操作要么都不间断地执行,要么不执行。

检查数值、修改变量值以及可能发生的睡眠操作,均作为一个单一的、不可分割的原子操作完成。保证一旦一个信号量操作开始,则在该操作完成或阻塞之前,其他进程均不允许访问该信号量。

这种原子性对于解决同步问题和避免竞争条件是绝对必要的。

up操作对信号量的值增1。

如果一个或多个进程在该信号量上睡眠,无法完成一个先前的down操作,则由系统选择其中的一个(如随机挑选)并允许该进程完成它的down操作。

于是,对一个有进程在其上睡眠的信号量执行一次up操作后,该信号量的值仍旧是0,但在其上睡眠的进程却少了一个。信号量的值增加1和唤醒一个进程同样也是不可分割的,不会有某个进程因执行up而阻塞,正如前面的模型中不会有进程因执行wakeup而阻塞一样。

Dijkstra论文中的信号量含义

在Dijkstra原来的论文中,他分别使用名称P和V而不是down和up,

荷兰语中,Proberen的意思是尝试,Verhogen的含义是增加或升高。

从物理上说明信号量的P、V操作的含义。

P(S)表示申请一个资源,S.value>0表示有资源可用,其值为资源的数目;S.value=0表示无资源可用;S.value<0, 则|S.value|表示S等待队列中的进程个数。

V(S)表示释放一个资源,信号量的初值应该大于等于0。P操作相当于“等待一个信号”,而V操作相当于“发送一个信号”,在实现同步过程中,V操作相当于发送一个信号说合作者已经完成了某项任务,在实现互斥过程中,V操作相当于发送一个信号说临界资源可用了。

实际上,在实现互斥时,P、V操作相当于申请资源和释放资源。

Dijkstra的解决方案使用了三个信号量:

一个称为full,用来记录充满缓冲槽数目,

一个称为empty,记录空的缓冲槽总数;

一个称为mutex,用来确保生产者和消费者不会同时访问缓冲区。

full的初值为0,empty的初值为缓冲区中槽的数目,mutex的初值为1。供两个或多个进程使用的信号量,其初值为1,保证同时只有一个进程可以进入临界区,称作二元信号量。如果每个进程在进入临界区前都执行down操作,并在刚刚退出时执行一个up操作,就能够实现互斥。

信号量的另一种用途是用于实现同步,信号量full和empty用来保证某种事件的顺序发生或不发生。在本例中,它们保证当缓冲区满的时候生产者停止运行,以及当缓冲区空的时候消费者停止运行。

对于无界缓冲区的生产者—消费者问题,两个进程共享一个不限大小的公共缓冲区。

由于是无界缓冲区(仓库是无界限制的),即生产者不用关心仓库是否满,只管往里面生产东西,但是消费者还是要关心仓库是否空。所以生产者不会因得不到缓冲区而被阻塞,不需要对空缓冲区进行管理,可以去掉在有界缓冲区中用来管理空缓冲区的信号量及其PV操作。

JUC中的信号量 Semaphore

在JUC中的信号量 Semaphore属于共享锁。Semaphore可以用来控制在同一时刻访问共享资源的线程数量,通过协调各个线程以保证共享资源的合理使用。Semaphore维护了一组虚拟许可,其数量可以通过构造器的参数指定。线程在访问共享资源前必须使用Semaphore的acquire()方法获得许可,如果许可数量为0,该线程就一直阻塞。线程访问完成资源后,必须使用Semaphore的release()方法释放许可。更形象的说法是:Semaphore是一个是许可管理器。

JUC包中Semaphore类的主要方法大致如下:

Semaphore类的主要方法大致如下:

(1) Semaphore(permits):构造一个Semaphore实例,初始化其管理的许可数量为permits参数值。

(2) Semaphore(permits,fair):构造一个Semaphore实例,初始化其管理的许可数量为permits参数值,以及是否以公平模式(fair参数是否为true)进行许可的发放。

Semaphore和ReentrantLock类似,Semaphore发放许可时有两种模式:公平模式和非公平模式,默认情况下使用非公平模式。

(3) availablePermits():获取Semaphore对象可用的许可数量。

(4) acquire():当前线程尝试获取Semaphore对象的一个许可。此过程是阻塞的,线程会一直等待Semaphore发放一个许可,直到发生以下任意一件事:

  • 当前线程获取了一个可用的许可。
  • 当前线程被中断,就会抛出InterruptedException异常,并停止等待,继续往下执行。

(5) acquire(permits) :当前线程尝试阻塞地获取permits个许可。此过程是阻塞的,线程会一直等待Semaphore发放permits个许可。如果没有足够的许可而当前线程被中断,就会抛出InterruptedException异常并终止阻塞。

(6) acquireUninterruptibly():当前线程尝试阻塞地获取一个许可,阻塞的过程不可中断,直到成功获取一个许可。

(7) acquireUninterruptibly(permits):当前线程尝试阻塞地获取permits个许可,阻塞的过程不可中断,直到成功获取permits个许可。

(8) tryAcquire():当前线程尝试获取一个许可。此过程是非阻塞的,它只是进行一次尝试,会立即返回。如果当前线程成功获取了一个许可,就返回true;如果当前线程没有获得许可,就返回false

(9) tryAcquire(permits):当前线程尝试获取permits个许可。此过程是非阻塞的,它只是进行一次尝试,会立即返回。如果当前线程成功获取了permits个许可,就返回true;如果当前线程没有获得permits个许可,就返回false。

(10) tryAcquire(timeout,TimeUnit):限时获取一个许可。此过程是阻塞的,会一直等待许可,直到发生以下任意一件事:

  • 当前线程获取了一个许可,则会停止等待,继续执行,并返回true。
  • 当前线程等待timeout后超时,则会停止等待,继续执行,并返回false。
  • 当前线程在timeout时间内被中断,则会抛出InterruptedException异常,并停止等待,继续执行。

(11) tryAcquire(permits,timeout,TimeUnit):与tryAcquire(timeout,TimeUnit)方法在逻辑上基本相同,不同之处在于:在获取许可的数量上不同,此方法用于获取permits个许可。

(12) release():当前线程释放一个可用的许可。

(13) release(permits):当前线程释放permits个可用的许可。

(14) drainPermits():当前线程获得剩余的所有可用许可。

(15) hasQueuedThreads():判断当前Semaphore对象上是否存在正在等待许可的线程。

(16) getQueueLength():获取当前Semaphore对象上正在等待许可的线程数量。

使用Semaphore实现的生产者-消费者模式

那接下来我们就看下使用Semaphore实现的生产者-消费者模式的代码,主要是针对临界区资源和临界区代码进行修改,具体修改如下:


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

public class SemaphorePetStore {
    public static final int MAX_AMOUNT = 10; //数据区长度

    //共享数据区,类定义
    static class DateBuffer<T> {
        //保存数据
        private LinkedBlockingDeque<T> dataList = new LinkedBlockingDeque<>(MAX_AMOUNT);

        //保存数量
        private volatile int amount = 0;
        // 每次处理的次数
        private static final int times = 100;

        //信号量标识
        private static AtomicInteger signal = new AtomicInteger(0);


        // 向数据区增加一个元素
        public void add(T element) throws Exception {
            while (amount < times) {
                if (signal.get() >= 0 && dataList.size() == 0) {
                    synchronized (signal) {
                        //生产者: P操作 -1
                        Print.fo("生产者: P操作 -1 ");
                        signal.incrementAndGet();
                        Print.fo("生产者: 生产,放入一个对象");
                        dataList.add(element);
                        amount++;
                        //生产者: P操作 -1
                        Print.fo("生产者: V操作 +1");
                        signal.decrementAndGet();
                        Print.fo("生产者: 通知消费者,生产者阻塞");
                        signal.notifyAll();
                        // 阻塞
                        signal.wait();
                        ;
                    }
                } else {
                    Thread.sleep(10);
                }
            }
        }

        /**
         * 从数据区取出一个商品
         */
        public T fetch() throws Exception {
            T element = null;
            while (amount < times) {
                if (signal.get() <= 0 && dataList.size() > 0) {
                    synchronized (signal) {
                        //消费者: P操作 -1
                        Print.fo("消费者: P操作 -1 ");
                        signal.decrementAndGet();
                        Print.fo("消费者: 消费,取出一个对象");
                        element = dataList.take();
                        amount--;
                        //生产者: P操作 -1
                        Print.fo("消费者: V操作 +1");
                        signal.incrementAndGet();
                        Print.fo("消费者: 通知生产者,消费者阻塞");
                        signal.notifyAll();
                        // 阻塞
                        signal.wait();
                        ;
                    }
                } else {
                    Thread.sleep(10);
                }
            }
            return element;
        }
    }
}

由于其他代码未做更改,小伙伴可参考前面的线程不安全的生产者类、消费者类以及组装生产者-消费者模式的实现。

部分执行结果如下:

版本4:使用Blockingqueue 实现

回顾前面: 为了实现 线程安全的生产者-消费者模式实现,可以使用锁来实现。

  • 比较低级的办法是用内置锁,也就是synchronized+wait+notify方式解决
  • 更加高级一点的版本,使用显示锁 比如信号量(Semaphore)、Blockingqueue
  • 当然,终极版本是无锁编程 disruptor 的方式来解决,

当然,咱们这个小节,仅仅关注有锁版本,后面的小节,再无锁编程。

咱们就挨个来看下。那接下我们看下基于 显示锁 实现的核心结构 Blockingqueue实现线程安全的生产者-消费者模式。

在多线程环境中,通过BlockingQueue(阻塞队列)可以很容易地实现多线程之间数据共享和通信。

阻塞队列与普通队列(ArrayDeque等)之间的最大不同点在于阻塞队列提供了阻塞式的添加和删除方法。

(1)阻塞添加

所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞添加元素的线程,直队列元素不满时,才重新唤醒线程执行元素添加操作。

(2)阻塞删除

阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空时,才重新唤醒删除线程再执行删除操作。

BlockingQueue的实现类有ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,具体如下:

ArrayBlockingQueue是一个常用的阻塞队列,是基于数组实现的,其内部使用一个定长数组存储元素。除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整型变量,分别标识队列的头部和尾部在数组中的位置。ArrayBlockingQueue的添加和删除操作都是共用同一个锁对象,由此意味着添加和删除无法并行运行,这一点不同于LinkedBlockingQueue。ArrayBlockingQueue完全可以将添加和删除的锁分离,从而添加和删除操作完全并行。Doug Lea之所以没有这样去做,是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧。

LinkedBlockingQueue是基于链表的阻塞队列,其内部也维持着一个数据缓冲队列(该队列由一个链表构成)。LinkedBlockingQueue对于添加和删除元素分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

DelayQueue中的元素只有当其指定的延迟时间到了,才能从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中添加数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。DelayQueue使用场景较少,但是相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

基于优先级的阻塞队列和DelayQueue类似,PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。在使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。

相对于有缓冲的阻塞队列(如LinkedBlockingQueue)来说,SynchronousQueue少了中间缓冲区(如仓库)的环节。如果有仓库,生产者直接把商品批发给仓库,不需要关心仓库最终会将这些商品发给哪些消费者,由于仓库可以中转部分商品,总体来说有仓库进行生产和消费的吞吐量高一些。反过来说,又因为仓库的引入,使得商品从生产者到消费者中间增加了额外的交易环节,单个商品的及时响应性能可能会降低,所以对单个消息的响应要求高的场景可以使用SynchronousQueue。声明一个SynchronousQueue有两种不同的方式:公平模式和非公平模式。公平模式的SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略。非公平模式(默认情况)的SynchronousQueue采用非公平锁,同时配合一个LIFO堆栈(TransferStack内部实例)来管理多余的生产者和消费者。对于后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现线程饥渴的情况,即可能出现某些生产者或者消费者的数据永远都得不到处理。

了解完阻塞队列的基本方法、主要类型之后,下面通过ArrayBlockingQueue队列实现一个生产者-消费者的案例。

具体的代码在前面的生产者和消费者实现基础上进行迭代——Consumer(消费者)和Producer(生产者)通过ArrayBlockingQueue队列获取和添加元素。其中,消费者调用了take()方法获取元素,当队列没有元素就阻塞;生产者调用put()方法添加元素,当队列满时就阻塞。通过这种方式便实现生产者-消费者模式,比直接使用等待唤醒机制或者Condition条件队列更加简单。基于ArrayBlockingQueue的生产者和消费者实现版本具体的UML类图如下:

出于“分离变与不变”的原则,此版本的Producer(生产者)、Consumer(消费者)等的逻辑不用变化,直接复用前面原的代码即可。此版本DataBuffer(共享数据区)需要变化,使用一个ArrayBlockingQueue用于缓存数据,具体的代码如下:

// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

public class ArrayBlockingQueuePetStore {

    public static final int MAX_AMOUNT = 10; //数据区长度


    //共享数据区,类定义
    static class DateBuffer<T> {
        //保存数据
        private ArrayBlockingQueue<T> dataList = new ArrayBlockingQueue<>(MAX_AMOUNT);


        // 向数据区增加一个元素
        public void add(T element) throws Exception {
            dataList.put(element);
        }

        /**
         * 从数据区取出一个商品
         */
        public T fetch() throws Exception {
            return dataList.take();
        }
    }
}

运行程序,部分执行结果如下:

锁的代价

锁提供了互斥,并能够确保变化能够以一个确定的顺序让其它的线程看见。

锁其实是很昂贵的,因为他们在竞争的时候需要进行仲裁。这个仲裁会涉及到操作系统的上下文切换,操作系统会挂起所有在等待这把锁的线程,直到锁持有者释放该锁。

上下文切换期间,执行线程会丧失对操作系统的控制,导致执行线程的执行上下文丢失之前缓存的数据和指令集,这会给现代处理器带来严重的性能损耗。

当然效率更高的用户态锁是另一种选择,但用户锁只有在没有竞争的时候才真正会带来益处。

注:因为用户态的锁往往是通过自旋锁来实现(或者带休眠的自旋锁),而自旋在竞争激烈的时候开销是很大的(一直在消耗CPU资源)。

网上有小伙伴为了进行效果验证,写了一个很简单程序,就是调用一个循环5亿次递增操作的函数。

这个java函数在单线程,2.4G Intel Westmere EP的CPU上只需要300ms。

一旦引入锁,即使没有发生竞争,程序的执行时间也会发生显著的增加。

循环5亿次递增操作 实验结果如下:

Method Time (ms)
Single thread 300
Single thread with lock 10,000
Two threads with lock 224,000
Single thread with CAS 5,700
Two threads with CAS 30,000
Single thread with volatile write 4,700

CAS的代价

无锁编程 场景中, 线程之间的协调 主要使用 CAS的机制。 但是 从上面的实验看到, CAS也是有代价的。

为啥呢?

CAS依赖于处理器的支持,当然大部分现代处理器都支持。

CAS相对于锁是非常高效的,因为它不需要涉及内核上下文切换进行仲裁。

但cas并不是免费的,处理器需要对CPU指令pipeline加锁以确保原子性,

并且cas只保证原子性,不保证可见性,所以cas一般和 volatile内存屏障一起使用,以确保对其他线程的可见性。

尼恩备注:cas+ volatile内存屏障 的底层原理,非常重要

如果大家对 cas+ volatile内存屏障 的知识不清楚, 请细致阅读尼恩 《Java高并发核心编程 卷2 》, 这本书做了非常详细的介绍。

版本5:无锁实现生产者-消费者模式版本

回顾前面: 为了实现 线程安全的生产者-消费者模式实现,可以使用锁来实现。

  • 比较低级的办法是用内置锁,也就是synchronized+wait+notify方式解决
  • 更加高级一点的版本,使用显示锁 比如信号量(Semaphore)、Blockingqueue
  • 当然,终极版本是无锁编程 disruptor 的方式来解决,

咱们就挨个来看下无锁版本。

为了提升性能,需要使用 CAS实现生产者、消费者。

从实操的角度来说,CAS的一个问题就是太复杂了,本来用锁进行并发编程就已经很头疼了,用CAS来实现复杂逻辑就更头痛了。

但有一个好消息是,目前有一个现成的Disruptor框架,它已经帮助我们实现了这一个功能。

Disruptor框架的简单介绍

Disruptor框架有着1000W ops性能,非常复杂的底层原理,光介绍清楚这个框架,尼恩的 《Disruptor 红宝书》 PDF 电子书就有100多页。

具体的Disruptor框架底层原理,请参见尼恩的 《Disruptor 红宝书》 PDF ,和《100Wqps日志平台实操》视频。

这里仅仅对这个 框架进行简单介绍。

Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。

它使用无锁的方式实现了一个环形队列(RingBuffer),非常适合实现生产者-消费者模式,比如事件和消息的发布。

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

Disruptor框架别出心裁地使用了环形队列来代替普通线形队列,这个环形队列内部实现为一个普通的数组。

对于一般的队列,势必要提供队列同步head和尾部tail两个指针,用于出队和入队,这样无疑增加了线程协作的复杂度。但如果队列是环形的,则只需要对外提供一个当前位置cursor,利用这个指针既可以进行入队操作,也可以进行出队操作。由于环形队列的缘故,队列的总大小必须事先指定,不能动态扩展。为了能够快速从一个序列(sequence)对应到数组的实际位置(每次有元素入队,序列就加1),Disruptor框架要求我们必须将数组的大小设置为2的整数次方。这样通过sequence &(queueSize-1)就能立即定位到实际的元素位置index,这比取余(%)操作快得多。

如果queueSize是2的整数次幂,则这个数字的二进制表示必然是10、100、1000、10000等形式。因此,queueSize-1的二进制则是一个全1的数字。因此它可以将sequence限定在queueSize-1范围内,并且不会有任何一位是浪费的。

RingBuffer的结构如下:

其实质只是一个普通的数组,只是当放置数据填充满队列(即到达2^n-1位置)之后,再填充数据,就会从0开始,覆盖之前的数据,于是就相当于一个环。

RingBuffer的指针(Sequence)属于一个volatile变量,同时也是我们能够不用锁操作就能实现Disruptor的原因之一,而且通过缓存行补充,避免伪共享问题。 该所谓指针是通过一直自增的方式来获取下一个可写或者可读数据,该数据是Long类型,不用担心会爆掉。有人计算过: long的范围最大可以达到9223372036854775807,一年365 * 24 * 60 * 60 = 31536000秒,每秒产生1W条数据,也可以使用292年。

Disruptor 不像传统的队列,分为一个队头指针和一个队尾指针,而是只有一个角标(上面的seq),

在Disruptor中生产者分为单生产者和多生产者,在枚举类ProducerType中定义单生产(SINGLE)和多生产(MULTI)。而消费者并没有区分。

单生产者情况下,就是普通的生产者向RingBuffer中放置数据,消费者获取最大可消费的位置,并进行消费。单生产者线程写数据的流程比较简单,具体如下:

(1)申请写入m个元素;

(2)若是有m个元素可以入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;

(3)若是返回的正确,则生产者开始写入元素。

采用多生产者时,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

但是又会碰到新问题:如何防止读取的时候,读到还未写的元素。那么Disruptor引入了一个跟RingBuffer同样大小的Buffer,称为AvailableBuffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。多生产者流程如下:

(1)申请写入m个元素;

(2)若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;

(3)生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。

那么生产者和消费者模式在RingBuffer上的情况如下

生产者向缓冲区中写入数据,而消费者从中读取数据。生产者写入数据时,使用CAS操作,消费者读取数据时,为了防止多个消费者处理同一个数据,也使用CAS操作进行数据保护。

ArrayBlockingQueue和Disruptor 的性能PK

参考文献中,有小伙伴选取了Doug Lea的ArrayBlockingQueue的实现作为参考目标进行测试,ArrayBlockingQueue是所有有界队列中性能最好的,测试是按照阻塞的方式进行的。

  

下表展示了总共处理5亿条消息时每秒吞吐量的性能测试结果,

测试环境为:没有HT的1.6.0_25 64-bit Sun JVM, Windows 7, Intel Core i7 860 @ 2.8 GHz ,以及Intel Core i7-2720QM, Ubuntu 11.04。

我们取了最好的前三条结果,这个结果使用于任何JVM运行环境,表中显示的结果并不是我们发现最好的结果。

Nehalem 2.8Ghz – Windows 7 SP1 64-bit Sandy Bridge 2.2Ghz – Linux 2.6.38 64-bit
ABQ Disruptor ABQ Disruptor
Unicast: 1P – 1C 5,339,256 25,998,336 4,057,453 22,381,378
Pipeline: 1P – 3C 2,128,918 16,806,157 2,006,903 15,857,913
Sequencer: 3P – 1C 5,539,531 13,403,268 2,056,118 14,540,519
Multicast: 1P – 3C 1,077,384 9,377,871 260,733 10,860,121
Diamond: 1P – 3C 2,113,941 16,143,613 2,082,725 15,295,197

无论在linux 环境在是在windows 环境, 无论 是多个生产者还是单个生产者, Disruptor 的性能稳稳的都在 1000W ops 以上。

基于Disruptor的实现100W ops+ 生产者和消费者设计

基于Disruptor的高性能生产者和消费者模式的类图如下:

MsgEven 是存放数据对象的载体,具体代码如下:

public class MsgEven {

    private IGoods goods;

    public IGoods getGoods() {
        return goods;
    }

    public void setGoods(IGoods goods) {
        this.goods = goods;
    }
}

消费者的作用是读取数据进行处理。

这里,数据的读取已经由Disruptor框架进行封装了,onEvent()方法为框架的回调方法。

因此,只需要简单地进行数据处理即可。

具体代码如下:

public class Consumer  implements EventHandler<MsgEven> {

    //消费的时间间隔,默认等待100毫秒
    public static final int CONSUME_GAP = 100;


    //消费者对象编号
    static final AtomicInteger CONSUMER_NO = new AtomicInteger(1);

    //消费者名称
    String name;


    public Consumer() {
        name = "消费者-" + CONSUMER_NO.incrementAndGet();
    }


    @Override
    public void onEvent(MsgEven msgEven, long sequence, boolean endOfBatch)  {
  
        Print.tcfo("消费者中:"+sequence+"商品信息:"+msgEven.getGoods());
    }
}

需要一个产生MsgEven 对象的工厂类GoodsFactory。

它会在Disruptor框架系统初始化时,构造所有的缓冲区中的对象实例,具体代码如下:

public class GoodsFactory implements EventFactory<MsgEven> {
    @Override
    public MsgEven newInstance() {
        return new MsgEven();
    }
}

生产者需要一个RingBuffer的引用,也就是环形缓冲区。

它有一个重要的方法add()将产生的数据推入缓冲区。方法add()接收一个IGood对象。

add()方法的功能就是将传入的IGood对象中的数据提取出来,并装载到环形缓冲区中。

具体代码如下:

public class Produer {

    //生产者对象编号
    static final AtomicInteger PRODUCER_NO = new AtomicInteger(1);

    //生产者名称
    String name = null;



    private  final RingBuffer<MsgEven> ringBuffer ;

    public Produer(RingBuffer<MsgEven> ringBuffer) {
        name = "生产者-" + PRODUCER_NO.incrementAndGet();
        this.ringBuffer = ringBuffer;
    }

    public  void add(IGoods goods){
        // 1.ringBuffer 事件队列 下一个槽
        long sequence = ringBuffer.next();
        try {
            //2.取出空的事件队列
            MsgEven    msgEven= ringBuffer.get(sequence);
            msgEven.setGoods(goods);
            //3.获取事件队列传递的数据
            Print.cfo("生产者名称:"+name+",生产商品:"+goods.toString());
        }finally {
            //4.发布事件
            ringBuffer.publish(sequence);
        }
    }
}

我们的生产者、消费者和数据都已经准备就绪,只差一个统筹规划的主函数将所有的内容整合起来。具体代码如下:

public class DisruptorPetStore {
    public static void main(String[] args) throws InterruptedException {
        // 1.创建工厂
        GoodsFactory dateBufferFactory= new GoodsFactory();
        //2.创建ringBuffer 大小,大小一定要是2的N次方
        int bufferSize=1024*1024;

        //3.创建Disruptor
        Disruptor<MsgEven>  disruptor = new Disruptor<MsgEven>(dateBufferFactory,bufferSize, Executors.defaultThreadFactory(),ProducerType.MULTI,new BlockingWaitStrategy());
        //4.设置事件处理器 即消费者
        disruptor.handleEventsWith(new Consumer());
        // 5.启动
        disruptor.start();

        // 6.创建RingBuffer容器
        RingBuffer<MsgEven> ringBuffer= disruptor.getRingBuffer();

        //7.创建生产者
        Produer produer = new Produer(ringBuffer);

        for (int l=0;true;l++){
            IGoods goods = Goods.produceOne();
            produer.add(goods);
            Thread.sleep(100);

        }
    }
}

部分执行结果如下:

学习生产者-消费者模式学习的思想, 消息队列、缓存中也有生产者-消费者模式的思想。

尼恩总结

生产者、消费者,是一道高频的面试题,非常高频,也非常考验水平。

如果按照上面的套路去作答, 无论是美团,还是华为,或者其他的大厂面试官,都会对你 献上膝盖。

如果面试过程中, 遇到什么问题,可以来 《技术自由圈》 社群交流。

作者介绍:

本文1作: 唐欢,资深架构师, 《Java 高并发核心编程 加强版 》作者之1 。

本文2作: 尼恩,40岁资深老架构师, 《Java 高并发核心编程 加强版 卷1、卷2、卷3》创世作者, 著名博主 。 《K8S学习圣经》《Docker学习圣经》等11个PDF 圣经的作者。

相关的面试题

最后,尼恩再给大家来几道相关的面试题

聊聊:如何写代码来解决生产者消费者问题?

在现实中你解决的许多线程问题都属于生产者消费者模型,就是一个线程生产任务供其它线程进行消费,你必须知道怎么进行线程间通信来解决这个问题。

比较低级的办法是用wait和notify来解决这个问题,比较赞的办法是用Semaphore 或者 BlockingQueue来实现生产者消费者模型。

聊聊:什么是竟态条件?

在大多数实际的多线程应用中,两个或两个以上的线程需要共享对同一数据的存取。

如果i线程存取相同的对象,并且每一个线程都调用了一个修改该对象状态的方法,将会发生什么呢?

可以想象,线程彼此踩了对方的脚。

根据线程访问数据的次序,可能会产生讹误的对象。这样的情况通常称为竞争条件。

聊聊:Java中Semaphore是什么?

Java中的Semaphore是一种新的同步类,它是一个计数信号。

从概念上讲,从概念上讲,信号量维护了一个许可集合。

如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。

每个 release()添加一个许可,从而可能释放一个正在阻塞的获取者。

但是,不使用实际的许可对象,Semaphore只对可用许可的号码进行计数,并采取相应的行动。

信号量常常用于多线程的代码中,比如数据库连接池。

聊聊:java中wait()的核心原理是什么?

1)当线程调用了locko(某个同步锁对象)的wait()方法后,JVM会将当前线程加入locko监视器的WaitSet(等待集),等待被其他线程唤醒。
2)当前线程会释放locko对象监视器的Owner权利,让其他线程可以抢夺locko对象的监视器。
3)让当前线程等待,其状态变成WAITING。

聊聊:java中的notify()的核心原理是什么?

1)当线程调用了locko(某个同步锁对象)的notify()方法后,JVM会唤醒locko监视器WaitSet中的第一个等待线程。

2)当线程调用了locko的notifyAll()方法后,JVM会唤醒locko监视器WaitSet中的所有等待线程。

3)等待线程被唤醒后,会从监视器的WaitSet移动到EntryList,线程具备了排队抢夺监视器Owner权利的资格,其状态从WAITING变成BLOCKED。

4)EntryList中的线程抢夺到监视器Owner权利之后,线程的状态从BLOCKED变成Runnable,具备重新执行的资格。

聊聊:谈一下synchronized的作用

synchronized 关键字主要用来解决的是多线程同步问题,其可以保证在被其修饰的代码任意时
刻只有一个线程执行。

视情况而定,(主动)说出它的用法及底层实现原理(使用的是
moniterenter 和 moniterexit指令…)。

聊聊:同时访问synchronized的静态和非静态方法,能保证线程安全吗?

不能,两者的锁对象不一样。

前者是类锁(XXX.class), 后者是this

聊聊:同时访问synchronized方法和非同步方法,能保证线程安全吗?

不能,因为synchronized只会对被修饰的方法起作用。

聊聊:两个线程同时访问两个对象的非静态同步方法能保证线程安全吗?

不能,每个对象都拥有一把锁。两个对象相当于有两把锁,导致锁对象不一致。(PS:如果
是类锁,则所有对象共用一把锁)

聊聊:若synchronized方法抛出异常,会导致死锁吗?

JVM会自动释放锁,不会导致死锁问题。

聊聊:若synchronized的锁对象能为空吗?会出现什么情况?

锁对象不能为空,否则抛出NPE(NullPointerException)

聊聊:synchronized的继承性问题

重写父类的synchronized的方法,主要分为两种情况:

(1)子类的方法没有被synchronized修饰:

synchronized的不具备继承性。所以子类方法是线程不安全的。

(2)子类的方法被synchronized修饰

两个锁对象其实是一把锁,而且是子类对象作为锁。

这也证明了: synchronized的锁是可重入锁。否则将出现死锁问题。

聊聊:如何在两个线程间共享数据?

可以通过共享对象来实现这个目的,或者是使用像阻塞队列这样并发的数据结构。

聊聊:Java中notify 和 notifyAll有什么区别?

因为多线程可以等待单监控锁,Java API 的设计人员提供了一些方法当等待
条件改变的时候通知它们,但是这些方法没有完全实现。notify()方法不能唤醒某个具体的线程,所以只有一个线程在等待的时候它才有用武之地。而notifyAll()唤醒所有线程并允许他们争夺锁确保了至少有一个线程能继续运行。

聊聊:为什么wait, notify 和 notifyAll这些方法不在thread类里面?

一个很明显的原因是JAVA提供的锁是对象级的而不是线程级的,每个对象都有锁,通过线程获得。如果线程需要等待某些锁那么调用对象中的wait()方法就有意义了。如果wait()方法定义在Thread类中,线程正在等待的是哪个锁就不明显了。简单的说,由于wait,notify和notifyAll都是锁级别的操作,所以把他们定义在Object类中因为锁属于对象。

聊聊:为什么wait和notify方法要在同步块中调用?

当一个线程需要调用对象的wait()方法的时候,这个线程必须拥有该对象的锁,接着它就会释放这个对象锁并进入等待状态直到其他线程调用这个对象上的notify()方法。同样的,当一个线程需要调用对象的notify()方法时,它会释放这个对象的锁,以便其他在等待的线程就可以得到这个对象锁。由于所有的这些方法都需要线程持有对象的锁,这样就只能通过同步来实现,所以他们只能在同步方法或者同步块中被调用。如果你不这么做,代码会抛出IllegalMonitorStateException异常。

什么是 Disruptor?它有哪些特点?

Disruptor 是一个高性能的无锁并发框架,用于解决在多线程场景下的数据共享和通信问题。它采用了环形缓冲区(RingBuffer)来存储事件,并通过控制序列(Sequence)来实现线程间的协作。Disruptor 的特点包括:高性能、低延迟、可扩展、无锁化等。

Disruptor 如何实现无锁并发?它的核心原理是什么?

Disruptor 实现无锁并发的核心原理是使用了 CAS(Compare And Swap)算法以及 volatile 变量保证数据的原子性和可见性。此外,Disruptor 还使用了基于序列的技术来实现线程间的协作。

Disruptor 的优缺点是什么?

Disruptor 的优点包括高性能、低延迟、可扩展、无锁化等。
缺点包括学习成本较高、应用场景相对局限等。

Disruptor 中的 RingBuffer 是什么?有哪些作用?

RingBuffer 是 Disruptor 中的核心组件之一,它是一个环形的缓冲区,用于存储事件。RingBuffer 的作用包括存储数据、实现高并发以及提供线程间的协作。

Disruptor 中的 Sequence 是什么?有哪些作用?

Sequence 是 Disruptor 中的另一个核心组件,它是一个单调递增的序列,用于控制事件的发布和消费。Sequence 的作用包括控制事件的发布和消费、实现线程间的协作等。

Disruptor 中的 EventProcessor 是什么?有哪些作用?

EventProcessor 是 Disruptor 中的一个概念,它主要负责处理事件,并将事件从 RingBuffer 中取出来进行处理。EventProcessor 的作用包括实现事件的处理、实现线程间的协作等。

Disruptor 中的 WaitStrategy 是什么?有哪些类型?

WaitStrategy 是 Disruptor 中用于实现线程间协作的策略之一,它主要决定了消费者在没有可用事件时如何等待。Disruptor 提供了多种 WaitStrategy,包括 BlockingWaitStrategy、BusySpinWaitStrategy、LiteBlockingWaitStrategy 等。

Disruptor 的多生产者模型和多消费者模型分别是什么?如何实现?

Disruptor 支持多生产者和多消费者模型。多生产者模型指的是多个生产者往 RingBuffer 中写入数据,多消费者模型指的是多个消费者从 RingBuffer 中读取数据。实现多生产者和多消费者模型需要使用不同的序列来控制并发操作。

Disruptor 如何保证数据的顺序性?

Disruptor 通过使用控制序列(Sequence)来实现数据的顺序性。在生产者往 RingBuffer 中写入数据时,会更新生产者的序列;在消费者从 RingBuffer 中读取数据时,会更新消费者的序列。通过对序列的控制,可以保证事件的顺序性。

Disruptor 如何保证数据的可见性?

Disruptor 使用 volatile 变量和 CAS 算法来保证数据的可见性。

在生产者往 RingBuffer 中写入数据时,会使用 volatile 变量来确保数据的可见性;在消费者从 RingBuffer 中读取数据时,会使用 CAS 算法来保证对序列的原子性和可见性,从而保证数据的可见性。

Disruptor 的实现原理是什么?

Disruptor 实现原理主要包括以下几个方面:

  • 使用环形缓冲区(RingBuffer)存储事件
  • 使用控制序列(Sequence)控制事件的发布和消费
  • 使用多生产者和多消费者模型提高并发性能
  • 使用无锁算法保证线程安全等。

Disruptor 与传统的线程池有什么区别?

Disruptor 和传统的线程池相比,具有更高的并发性能和更低的延迟。

这是因为 Disruptor 使用了无锁算法和基于序列的技术来实现数据共享和通信,避免了线程间的互斥和同步操作,从而提高了并发性能,并且由于没有线程切换的开销,也可以降低延迟。

Disruptor 如何处理异常?

Disruptor 中的异常处理需要由应用程序自己实现。

一般来说,可以在 EventProcessor 中捕获异常,并进行相应的处理。如果不进行处理,异常可能会导致整个系统崩溃。

聊聊:Disruptor 适用于哪些场景?

Disruptor 适用于需要高性能、低延迟、大规模并发、对数据顺序有要求等场景,例如高频交易系统、大规模数据处理系统、实时消息系统等。

聊聊:Disruptor 如何进行性能测试?

Disruptor 的性能测试可以使用 JMH(Java Microbenchmark Harness)框架进行。

常见的性能测试包括吞吐量测试、延迟测试、竞争测试等。

题外话:JMH 是什么?

JMH:即(Java Microbenchmark Harness),它是由 Java 官方团队开发的一款用于 Java 微基准测试工具。

基准测试:是指通过设计科学的测试方法、测试工具和测试系统,实现对一类测试对象的某项性能指标进行定量的和可对比的测试。

比如鲁大师、安兔兔,都是按一定的基准或者在特定条件下去测试某一对象的的性能,比如显卡、IO、CPU 之类的。

JMH 官网:http://openjdk.java.net/projects/code-tools/jmh

JMH 和 JMeter 的不同?

JMeter 更多的是对 rest api 进行压测,而 JMH 关注的粒度更细,它更多的是发现某块性能槽点代码,然后对优化方案进行基准测试对比。比如 json 序列化方案对比,bean copy 方案对比等。

聊聊:Disruptor 如何保证线程安全?

Disruptor 使用了无锁算法和基于序列的技术来保证线程安全。

具体而言,它使用 CAS 算法和 volatile 变量来保证数据的原子性和可见性,使用控制序列来实现线程间的协作,避免了线程间的互斥和同步操作,从而保证线程安全。

聊聊:Disruptor 中的 Sequence Barrier 是什么?有哪些作用?

Sequence Barrier 是 Disruptor 中用于实现线程间协作的组件之一,它主要负责控制消费者的读取进度,并阻塞消费者直到事件可用。

Sequence Barrier 的作用包括实现线程间的协作、控制消费者的读取进度、提高并发性能等。

聊聊:Disruptor 中的 BatchEventProcessor 是什么?有哪些作用?

BatchEventProcessor 是 Disruptor 中的一个概念,它是一个高性能的事件处理器,可以批量地处理事件。

BatchEventProcessor 的作用包括实现事件的处理、实现线程间的协作等。

聊聊:Disruptor 如何解决数据竞争问题?

Disruptor 使用无锁算法和基于序列的技术来避免数据竞争问题。

具体而言,它使用 CAS 算法和 volatile 变量来保证数据的原子性和可见性,使用控制序列来实现线程间的协作,从而避免了线程间的互斥和同步操作,避免了数据竞争问题。

聊聊:Disruptor 中的 EventTranslator 是什么?有哪些作用?

EventTranslator 是 Disruptor 中实现事件发布的一种方式,它可以将用户定义的数据转换为 Disruptor 需要的事件格式,并发布到 RingBuffer 中。

EventTranslator 的作用包括简化事件发布的流程、提高代码的可读性等,同时还可以提高性能,因为它可以避免在生产者线程和 RingBuffer 之间进行对象复制的操作。

通过 EventTranslator,用户可以自定义事件,并将其发布到 RingBuffer 中,让消费者线程进行处理。

聊聊:Disruptor 如何处理高并发场景下的竞争问题?

Disruptor 通过使用 RingBuffer 的多个序号来实现消费者和生产者之间的解耦。

RingBuffer 的每个序号都对应着一个槽位,生产者将事件发布到槽位中,并更新序号。

消费者根据各自的需要获取槽位中的事件,并更新对应的序号。

这种方式避免了生产者和消费者之间的竞争,从而提高了系统性能。

聊聊:Disruptor 中的 TimeoutBlockingWaitStrategy 是什么?有哪些作用?

TimeoutBlockingWaitStrategy 是一种等待策略,它会在一定的时间内进行等待,并且在超时后抛出异常。

这种等待策略可以在需要限制等待时间的场景中使用,例如在生产者线程向 RingBuffer 发布事件时,限制等待时间可以保证生产者不会一直阻塞在 RingBuffer 上,从而避免系统出现死锁等问题。

聊聊:Disruptor 如何实现有界队列?

Disruptor 使用 RingBuffer 来实现有界队列,RingBuffer 的容量就是队列的大小。

当 RingBuffer 的可用空间被填满后,新的事件将无法继续发布,从而实现了对队列大小的控制。

聊聊:Disruptor 在实际项目中的应用有哪些经验?

Disruptor 主要用于解决高性能的并发编程问题,例如在金融领域的交易系统、广告系统中的实时数据处理等场景。

在实际应用中,需要根据具体的业务需求来选择相应的策略和配置参数。

聊聊:Disruptor 的性能瓶颈在哪里?如何避免?

Disruptor 的性能瓶颈主要在于 RingBuffer 的读写操作和事件处理器的逻辑处理。

为了避免性能瓶颈,可以使用合适的等待策略、线程池大小和事件处理器数量等策略,并且尽可能避免在事件处理器中进行复杂的计算或者 IO 操作。

聊聊:Disruptor 和 Kafka 中的消息队列相比有哪些异同?

Disruptor 和 Kafka 都是高性能的消息队列,但是它们的设计目标和应用场景有所不同。

Disruptor 的设计目标是提供一个非常快速、可预测的内存消息传递机制,用于实现高性能的并发编程。

而 Kafka 的设计目标是建立一个高可靠、可扩展、持久化的分布式消息队列,用于实现大规模数据的流式处理。

聊聊:Disruptor 是否适用于分布式系统?

Disruptor 是一种本地内存消息传递机制,不适用于分布式系统。

如果需要在分布式环境中使用 Disruptor,可以考虑使用类似于 Kafka 的分布式消息队列来代替。

聊聊:Disruptor 是否适用于对数据一致性要求较高的场景?

Disruptor 是一种内存模型,在单个 JVM 中提供了高效且可预测的消息传递机制,因此适用于对数据一致性要求较高的场景。

但是,如果需要实现跨进程或者跨机器的高一致性需求,需要考虑使用分布式锁等更为复杂的机制。

聊聊:Disruptor 是否适用于内存受限的场景?

Disruptor 在内存使用上比较高效,但如果在内存受限的场景下,需要根据具体情况进行评估。

Disruptor 的内存消耗主要来自于 RingBuffer 和事件对象本身,可以通过减小 RingBuffer 大小和优化事件对象等方式来降低内存消耗。

同时,也可以考虑使用压缩算法等方式来减小事件对象的大小。因此,在内存受限的场景下,需要结合具体的业务需求和系统限制来进行评估。

聊聊:Disruptor 是否适用于长时间阻塞的场景?

Disruptor 不适用于长时间阻塞的场景。

Disruptor 的设计初衷是为了解决高并发、低延迟的场景,即在处理速度和吞吐量方面具有优势。

在 Disruptor 中,生产者和消费者之间通过环形缓冲区进行数据交换,但如果某个消费者在处理某个事件时出现了长时间阻塞,则会影响其他消费者的处理效率,降低整个系统的性能。

如果需要处理长时间阻塞的任务,可以考虑使用线程池等方式来解决。

同时,也可以对 Disruptor 进行扩展,实现类似于超时机制的功能,当某个消费者的处理时间超过一定阈值时,自动放弃该事件的处理,避免长时间阻塞对整个系统的影响。

参考文献:

https://www.cnblogs.com/daoqidelv/p/7043696.html

清华大学出版社《Java高并发核心编程 卷2 加强版》

《队列之王 Disruptor 红宝书》

技术自由的实现路径:

实现你的 架构自由:

吃透8图1模板,人人可以做架构

10Wqps评论中台,如何架构?B站是这么做的!!!

阿里二面:千万级、亿级数据,如何性能优化? 教科书级 答案来了

峰值21WQps、亿级DAU,小游戏《羊了个羊》是怎么架构的?

100亿级订单怎么调度,来一个大厂的极品方案

2个大厂 100亿级 超大流量 红包 架构方案

… 更多架构文章,正在添加中

实现你的 响应式 自由:

响应式圣经:10W字,实现Spring响应式编程自由

这是老版本 《Flux、Mono、Reactor 实战(史上最全)

实现你的 spring cloud 自由:

Spring cloud Alibaba 学习圣经》 PDF

分库分表 Sharding-JDBC 底层原理、核心实战(史上最全)

一文搞定:SpringBoot、SLF4j、Log4j、Logback、Netty之间混乱关系(史上最全)

实现你的 linux 自由:

Linux命令大全:2W多字,一次实现Linux自由

实现你的 网络 自由:

TCP协议详解 (史上最全)

网络三张表:ARP表, MAC表, 路由表,实现你的网络自由!!

实现你的 分布式锁 自由:

Redis分布式锁(图解 - 秒懂 - 史上最全)

Zookeeper 分布式锁 - 图解 - 秒懂

实现你的 王者组件 自由:

队列之王: Disruptor 原理、架构、源码 一文穿透

缓存之王:Caffeine 源码、架构、原理(史上最全,10W字 超级长文)

缓存之王:Caffeine 的使用(史上最全)

Java Agent 探针、字节码增强 ByteBuddy(史上最全)

实现你的 面试题 自由:

4000页《尼恩Java面试宝典 》 40个专题

以上尼恩 架构笔记、面试题 的PDF文件,请到《技术自由圈》公众号领取

免费领取11个技术圣经PDF: