《Java 并发编程的艺术》实验03 等待通知机制的实现

发布时间 2023-10-31 16:31:10作者: Ba11ooner

等待通知机制的实现

需求分析

实验内容

设计一个任务队列,多个任务线程同时从队列中取出任务进行处理。当任务队列为空时,任务线程需要进入等待状态,直到新的任务到达。当有新的任务到达时,需要通知其中一个任务线程来处理新任务。

基于 Object Monitor 实现

实验目的

学习如何使用等待通知机制来实现线程间的协作。具体地说,我们将通过基于Object Monitor实现的等待通知机制来实现线程的等待和唤醒(wait-notify)。

实验过程
  1. 定义一个任务队列类TaskQueue,其中包含以下几个方法:
    • addTask(task): 将新任务加入到队列中。
    • getTask(): 从队列中取出一个任务。
    • wait(): 当队列为空时,消费者线程需要调用此方法进入等待状态。封装在 getTask() 内部
    • notify(): 当加入新任务时,生产者线程调用此方法来通知一个消费者线程来处理新任务。封装在 addTask() 内部
  2. 创建多个任务线程Consumer,它们会不断地调用getTask()来获取任务进行处理。当队列为空时,消费者线程需要调用wait()进入等待状态。
  3. 创建一个生产任务的线程Producer,它会在队列中添加新任务,并调用notify()方法来通知一个任务线程有新任务可供处理。
示例代码
public class ObjectMonitorDemo {
    //region 任务队列
    static class TaskQueue {
        private List<String> tasks = new ArrayList<>();

        //synchronized 保证插入的并发安全性
        public synchronized void addTask(String task) {
            tasks.add(task);
            //唤醒因为队列为空而等待的线程
            //为保证尽可能多的交替运行,使用 notify()
            //因为 notify() 方法将随机选择等待该对象锁的一个线程进行唤醒,其他线程仍将保持等待状态。
            //notify() 方法的线程间竞争相对较激烈,因为只有一个线程会被唤醒,而其他线程仍处于等待状态。
            //竞争激烈的情况下,随机性是主导因素,此时更容易按照 JVM 的伪随机分配性质更大概率地产生交替运行
            notify();
            //为保证尽可能少的交替运行,使用 notifyAll()
            //因为 notifyAll() 方法会唤醒所有等待该对象锁的线程。
            //而 notifyAll() 方法的线程间竞争较为均衡,因为它会唤醒所有等待的线程,这些线程将争夺锁资源。
            //竞争均衡的情况下,性能优化是主导因素,此时更容易根据性能优化,保证尽可能少的锁释放,进而导致更小概率的交替运行
            //notifyAll();
        }

        //synchronized 保证获取的并发安全性
        public synchronized String getTask() throws InterruptedException {
            //当任务队列为空时,等待
            while (tasks.isEmpty()) {
                wait();
            }
            return tasks.remove(0);
        }
    }
    //endregion

    //region 消费者线程
    static class Consumer extends Thread {
        private TaskQueue queue;
        private int threadId;

        public Consumer(TaskQueue queue, int id) {
            this.queue = queue;
            this.threadId = id;
        }

        public int getThreadId() {
            return threadId;
        }

        @Override
        public void run() {
            try {
                //持续从队列中读取任务
                while (true) {
                    String task = queue.getTask();
                    // 处理任务
                    System.out.println("Consumer-" + threadId + ": processing " + task);
                }
            } catch (InterruptedException e) {
                System.out.println("Consumer-" + threadId + " 已关闭");
                //e.printStackTrace();
            }
        }
    }
    //endregion

    //region 生产者线程
    static class Producer extends Thread {
        private TaskQueue queue;
        private List<Consumer> consumers;


        public Producer(TaskQueue queue, List<Consumer> consumerList) {
            this.queue = queue;
            this.consumers = consumerList;
        }

        @Override
        public void run() {
            int N = 3 * consumers.size();
            //向任务队列中插入有限任务
            for (int i = 0; i < N; i++) {
                String task = "Task " + i;
                queue.addTask(task);
                System.out.println("Producer: adding task " + task);
                try {
                    Thread.sleep(10); // 模拟添加任务的耗时操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("Producer: 任务增添完毕");
            for (Consumer c : consumers) {
                System.out.println("Consumer-" + c.getThreadId() + " 关闭中");
                c.interrupt();
            }
        }
    }
    //endregion

    public static void main(String[] args) {
        TaskQueue queue = new TaskQueue();
        Consumer consumer1 = new Consumer(queue, 1);
        Consumer consumer2 = new Consumer(queue, 2);
        Consumer consumer3 = new Consumer(queue, 3);
        Consumer consumer4 = new Consumer(queue, 4);
        List<Consumer> consumers = new ArrayList<>();
        consumers.add(consumer1);
        consumers.add(consumer2);
        consumers.add(consumer3);
        consumers.add(consumer4);
        Producer producer = new Producer(queue, consumers);

        producer.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
        consumer4.start();
    }
}
Producer: adding task Task 0
Consumer-2: processing Task 0
Producer: adding task Task 1
Consumer-1: processing Task 1
Producer: adding task Task 2
Consumer-3: processing Task 2
Producer: adding task Task 3
Consumer-4: processing Task 3
Producer: adding task Task 4
Consumer-2: processing Task 4
Producer: adding task Task 5
Consumer-1: processing Task 5
Producer: adding task Task 6
Consumer-3: processing Task 6
Producer: adding task Task 7
Consumer-4: processing Task 7
Producer: adding task Task 8
Consumer-2: processing Task 8
Producer: adding task Task 9
Consumer-1: processing Task 9
Producer: adding task Task 10
Consumer-3: processing Task 10
Producer: adding task Task 11
Consumer-4: processing Task 11
Producer: 任务增添完毕
Consumer-1 关闭中
Consumer-2 关闭中
Consumer-1 已关闭
Consumer-3 关闭中
Consumer-4 关闭中
Consumer-2 已关闭
Consumer-4 已关闭
Consumer-3 已关闭
知识补充
notify() 和 notifyAll() 对于交替运行的影响
  • 当使用 notify() 方法唤醒等待的线程时,只有一个线程会被唤醒,其他线程仍将保持等待状态。竞争激烈的情况下,随机性是主导因素,更容易产生交替运行。
  • 当使用 notifyAll() 方法唤醒等待的线程时,所有等待的线程都会被唤醒,它们将争夺锁资源。竞争均衡的情况下,性能优化是主导因素,更容易根据性能优化,保证尽可能少的锁释放,尽可能地让某一个线程长时间占有锁,进而导致更小概率的交替运行。
生产消费者模型 和 观察者模式

生产者消费者模型和观察者模式是两个不同的概念,它们之间没有直接的关系。

生产者消费者模型是一种多线程或多进程的同步模型,用于解决生产者和消费者之间的数据通信和协作问题。在这个模型中,生产者负责生成数据,消费者负责消费数据,而且生产者和消费者之间通过一个共享的缓冲区进行通信。生产者将数据放入缓冲区,消费者从缓冲区中取出数据进行处理。

观察者模式是一种软件设计模式,用于定义对象之间的一对多依赖关系,使得当一个对象的状态发生变化时,所有依赖于它的对象都能得到通知并自动更新。在观察者模式中,被观察者对象(也称为主题)维护了一个观察者列表,当其状态发生变化时,会遍历通知所有观察者进行更新操作。

虽然生产者消费者模型和观察者模式是两个独立的概念,但在实际应用中可以结合使用。例如,一个生产者可以作为被观察者对象,而多个消费者可以作为观察者对象,当生产者生产了新的数据时,会通知所有的消费者进行处理。这样可以实现一种基于消息通知的生产者消费者模型。

生产消费者模型 和 等待通知模式

生产者消费者模型和等待通知模式是相关的概念。生产者消费者模型是一种并发模式,用于解决多个线程间共享数据的问题。在该模型中,生产者线程负责生产数据并将其放入共享缓冲区,消费者线程从共享缓冲区中取出数据并进行消费。

而等待通知模式是一种线程同步的方式,用于实现线程间的协作。在等待通知模式中,线程在某些条件满足前进行等待,当条件满足时,其他线程会通知等待的线程继续执行。

在生产者消费者模型中,常常会使用等待通知模式来实现生产者和消费者之间的协作。生产者在生产数据之前,会检查共享缓冲区是否已满,如果已满则进入等待状态;消费者在消费数据之前,会检查共享缓冲区是否为空,如果为空则进入等待状态。当生产者生产数据后,会通知消费者继续执行;当消费者消费数据后,会通知生产者继续执行。通过使用等待通知模式,生产者和消费者能够有效地进行线程间的协作,避免了线程的空轮询等浪费资源的情况。

观察者模式 和 等待通知模式

观察者模式和等待通知模式都是用于实现对象间的消息传递和通知机制的设计模式。

观察者模式(Observer Pattern)定义了一种一对多的依赖关系,使得多个观察者对象同时监听一个主题对象。当主题对象状态发生改变时,会通知所有的观察者对象进行更新。观察者模式是一种松耦合的设计模式,主题对象和观察者对象之间的依赖关系通过抽象接口实现。

等待通知模式(Wait-Notify Pattern)用于线程间的协作。在等待通知模式中,一个线程等待某个条件的满足,另一个线程在满足条件时通知等待的线程继续执行。等待通知模式可以有效地避免线程的忙等待,提高系统的效率。

观察者模式和等待通知模式可以结合使用。在观察者模式中,观察者对象(也可以是线程)等待主题对象的通知,当主题对象的状态改变时,观察者对象被唤醒并进行相应的操作。这样就能实现观察者对象的异步通知和处理。这种结合可以提高系统的可扩展性和灵活性。

关系总结
生产者消费者模型 观察者模式 等待通知模式
协作方式 生产者生产数据,消费者消费数据 被观察者状态改变,通知观察者 线程等待条件满足,被通知继续执行
对象之间关系 生产者和消费者 被观察者和观察者 通知者和等待者
模型特点 多线程/多进程同步模型 一对多依赖关系 线程同步协作方式
直接关系 可以与观察者模式或等待通知模式结合使用 可以与生产者消费者模型结合使用 与生产者消费者模型有直接关系
定位 同步模型 设计模式 设计模式

基于 Condition 实现

实验目的

学习并掌握基于 Condition 的等待通知机制的使用,理解其在实际业务场景中的应用。

实验过程
  1. 创建一个任务队列类 TaskQueue,其中包含以下几个方法:
    • addTask(task): 将新任务加入到队列中。
    • getTask(): 从队列中取出一个任务。
    • wait(): 当队列为空时,消费者线程需要调用此方法进入等待状态。封装在 getTask() 内部
    • notify(): 当加入新任务时,生产者线程调用此方法来通知一个消费者线程来处理新任务。封装在 addTask() 内部
  2. 创建多个任务线程Consumer,它们会不断地调用getTask()来获取任务进行处理。当队列为空时,消费者线程需要调用wait()进入等待状态。
  3. 创建一个生产任务的线程Producer,它会在队列中添加新任务,并调用notify()方法来通知一个任务线程有新任务可供处理。
示例代码
public class ConditionDemo {
    //region 任务队列
    static class TaskQueue {
        private List<String> tasks = new ArrayList<>();
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();

        public void addTask(String task) {
            lock.lock();
            try {
                tasks.add(task);
                condition.signal();
            } finally {
                lock.unlock();
            }
        }

        public String getTask() throws InterruptedException {
            lock.lock();
            try {
                while (tasks.isEmpty()) {
                    condition.await();
                }
                return tasks.remove(0);
            } finally {
                lock.unlock();
            }
        }
    }
    //endregion

    //region 消费者线程
    static class Consumer extends Thread {
        private TaskQueue queue;
        private int threadId;

        public Consumer(TaskQueue queue, int id) {
            this.queue = queue;
            this.threadId = id;
        }

        public int getThreadId() {
            return threadId;
        }

        @Override
        public void run() {
            try {
                //持续从队列中读取任务
                while (true) {
                    String task = queue.getTask();
                    // 处理任务
                    System.out.println("Consumer-" + threadId + ": processing " + task);
                }
            } catch (InterruptedException e) {
                System.out.println("Consumer-" + threadId + " 已关闭");
                //e.printStackTrace();
            }
        }
    }
    //endregion

    //region 生产者线程
    static class Producer extends Thread {
        private TaskQueue queue;
        private List<Consumer> consumers;


        public Producer(TaskQueue queue, List<Consumer> consumerList) {
            this.queue = queue;
            this.consumers = consumerList;
        }

        @Override
        public void run() {
            int N = 3 * consumers.size();
            //向任务队列中插入有限任务
            for (int i = 0; i < N; i++) {
                String task = "Task " + i;
                queue.addTask(task);
                System.out.println("Producer: adding task " + task);
                try {
                    Thread.sleep(10); // 模拟添加任务的耗时操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("Producer: 任务增添完毕");
            for (Consumer c : consumers) {
                System.out.println("Consumer-" + c.getThreadId() + " 关闭中");
                c.interrupt();
            }
        }
    }
    //endregion

    public static void main(String[] args) {
        TaskQueue queue = new TaskQueue();
        Consumer consumer1 = new Consumer(queue, 1);
        Consumer consumer2 = new Consumer(queue, 2);
        Consumer consumer3 = new Consumer(queue, 3);
        Consumer consumer4 = new Consumer(queue, 4);
        List<Consumer> consumers = new ArrayList<>();
        consumers.add(consumer1);
        consumers.add(consumer2);
        consumers.add(consumer3);
        consumers.add(consumer4);
        Producer producer = new Producer(queue, consumers);

        producer.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
        consumer4.start();
    }

}
Producer: adding task Task 0
Consumer-2: processing Task 0
Producer: adding task Task 1
Consumer-3: processing Task 1
Producer: adding task Task 2
Consumer-1: processing Task 2
Producer: adding task Task 3
Consumer-4: processing Task 3
Producer: adding task Task 4
Consumer-2: processing Task 4
Producer: adding task Task 5
Consumer-3: processing Task 5
Producer: adding task Task 6
Consumer-1: processing Task 6
Producer: adding task Task 7
Consumer-4: processing Task 7
Producer: adding task Task 8
Consumer-2: processing Task 8
Producer: adding task Task 9
Consumer-3: processing Task 9
Producer: adding task Task 10
Consumer-1: processing Task 10
Producer: adding task Task 11
Consumer-4: processing Task 11
Producer: 任务增添完毕
Consumer-1 关闭中
Consumer-2 关闭中
Consumer-3 关闭中
Consumer-4 关闭中
Consumer-1 已关闭
Consumer-4 已关闭
Consumer-2 已关闭
Consumer-3 已关闭

基于 Object Monitor 实现 ? 基于 Condition 实现

本质上就是相同功能的两种不同写法

  • 基于 Object Monitor 实现的等待通知机制,直接使用 Java 提供的 synchronized 实现的隐式锁机制,notify()wait() 本身也是 Java 直接提供的等待通知机制
  • 基于 Condition 实现的等待通知机制,Condition 源于 Lock 接口,往往使用 Lock 接口的显式锁机制,Condition 的 signal()await() 则是通过显式控制等待队列的形式实现的等待通知机制

两种机制都可以实现等待通知的功能,但就像使用 Lock 机制比使用 synchronized 更灵活可控一样,基于 Condition 的实现比基于 Object Monitor 的实现更加灵活和可控。

  • 基于 Condition 的实现允许我们在一个对象上创建多个等待队列,并可以对每个队列进行等待和唤醒操作
  • 基于 Object Monitor 的实现再一个对象上则只能使用同一个等待通知队列