手写简单生产者消费者阻塞队列

发布时间 2023-11-07 16:42:25作者: 唔芜舞雾

主要实现生产者定时生产,消费者只要队列消息中有就消费。

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ProductConsumerQueue<E> {
    private final Queue<E> blockQueue;
    private final ReentrantLock lock;//锁
    private Integer MAX_SIZE = 20;
    private final Condition notFull;
    private final Condition notEmpty;
    ProductConsumerQueue(Integer capacity){
        lock = new ReentrantLock();
        blockQueue = new LinkedList<>();
        MAX_SIZE = capacity;
        notFull = lock.newCondition();
        notEmpty = lock.newCondition();
    }
    public void put(E element) throws InterruptedException {
        if(Objects.isNull(element)) {//消息不能为空
            throw new IllegalArgumentException("blockQueue put element is null");
        }
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//尝试获取锁,若没获取到在等待期间,可被interrupt()中断该等待过程
        try {
            while(blockQueue.size()==MAX_SIZE){//队列是否已满,满则等待队列不为空
                notFull.await();
            }
            blockQueue.add(element);//入队
            notEmpty.signal();//唤起等待队列不为空的线程
        }finally {
            lock.unlock();//解锁
        }
    }

    public E poll() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try{
            if(blockQueue.size()==0){//判断是否为空,为空则等待队列不为空,也可直接返回null,可根据需要自行更改策略
                notEmpty.await();
            }
            E e =blockQueue.poll();//出队
            notFull.signal();//唤醒等待队列未达到最大容量的线程
            return e;
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ProductConsumerQueue<Integer> queue = new ProductConsumerQueue<>(1024);
        AtomicInteger nums = new AtomicInteger(0);//原子类,线程安全
        Thread product = new Thread(()->{
            while(true){
                try {
                    Thread.sleep(200);//定时生产
                    int num = nums.getAndIncrement();
                    queue.put(num);
                    System.out.println("product enqueue"+num);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread consumer = new Thread(()->{
           while(true){
               try {
                   Integer num = queue.poll();
                   System.out.println("consumer dequeue"+num);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
        });
        consumer.start();
        product.start();
    }
}