CompletableFuture + LinkedBlockingDeque 实现生产者消费者案例

发布时间 2023-12-13 17:23:46作者: IT界的一名小学生

设计要求:

1. 设计一个生产者生产,消费者消费场景;

2. 使用线程池 CompletableFuture + 队列LinkedBlockingDeque 实现;

3. 生产者生产的数据存储到长度为5的 LinkedBlockingDeque 队列,消费者消费从 LinkedBlockingDeque 队列中取数据;

4. 生产者和消费者均是多线程且不知道谁快谁慢,互不干扰;

5. 消费者根据生产者情况进行结束

以下为模板案例代码,可以封装成工具类使用:

 1 package com.xx.product.manage.service;
 2 
 3 import java.util.concurrent.*;
 4 import java.util.concurrent.atomic.AtomicBoolean;
 5 
 6 public class ProducerConsumerExample {
 7     public static void main(String[] args) {
 8 
 9         LinkedBlockingDeque<Integer>  queue = new LinkedBlockingDeque<>(5);
10         Executor executor = Executors.newFixedThreadPool(2);
11 
12         AtomicBoolean producerFinished = new AtomicBoolean(false);
13 
14         CompletableFuture <Void> producerFuture = CompletableFuture.runAsync(() ->  {
15             try {
16                 for (int i = 1; i <= 10; i++) {
17                     // 生产数据
18                     System.out.println("Producing: " + i);
19 //                    Thread.sleep(3000); //验证 消费者速度 > 生产者速度
20                     queue.put(i);
21 
22                     // 如果队列已满,等待消费者消费数据
23                     if (queue.size() == 5) {
24                         System.out.println("Queue is full. Waiting for consumer...");
25                     }
26                 }
27             } catch (InterruptedException e) {
28                 e.printStackTrace();
29             }
30 
31             // 生产者结束
32             producerFinished.set(true);
33             System.out.println("Producer finished.");
34         }, executor);
35 
36         CompletableFuture <Void>  consumerFuture = CompletableFuture.runAsync(() ->  {
37             try {
38                 while (true) {
39                     // 如果队列为空且生产者已结束,则消费者退出循环
40                     if (queue.isEmpty() && producerFinished.get()) {
41                         break;
42                     }
43 
44                     // 消费数据
45 //                    Thread.sleep(2000); //验证生产者速度 > 消费者速度
46                     Integer item = queue.take();
47                     if (item != null) {
48                         System.out.println("Consuming: " + item);
49                     } else {
50                         // 如果队列为空,等待生产者生产数据
51                         System.out.println("Queue is empty. Waiting for producer...");
52                         Thread.sleep(1000);
53                     }
54                 }
55             } catch (InterruptedException e) {
56                 e.printStackTrace();
57             }
58 
59             System.out.println("Consumer finished.");
60         }, executor);
61 
62         // 等待生产者和消费者都完成
63         CompletableFuture.allOf(producerFuture, consumerFuture).join();
64         System.out.println("All tasks finished.");
65 
66         // 关闭线程池
67         ((ExecutorService) executor).shutdown();
68     }
69 }

 知识点: