LinkedBlockingQueue

发布时间 2023-10-31 10:34:01作者: anpeiyong

概述

  An optionally-bounded {@linkplain BlockingQueue blocking queue} based on linked nodes.

  This queue orders elements FIFO (first-in-first-out).
  The <em>head</em> of the queue is that element that has been on the queue the longest time.
  The <em>tail</em> of the queue is that element that has been on the queue the shortest time.
  New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.
  Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.

    基于 链接节点 的可选边界 BlockingQueue。

    此队列按 FIFO(先进先出)对元素进行排序。
    队列<em>头部</em>是在队列中停留时间最长的元素。
    队列的<em>尾</em>部是队列中停留时间最短的元素。
    新元素插入到队列的尾部,队列检索操作获取队列末尾的元素。
    链接队列通常比基于Array的队列具有更高的吞吐量,但在大多数并发应用程序中,性能的可预测性较低。

  

  The optional capacity bound constructor argument serves as a way to prevent excessive queue expansion.
  The capacity, if unspecified, is equal to {@link Integer#MAX_VALUE}.
  Linked nodes are dynamically created upon each insertion unless this would bring the queue above capacity.

    可选的容量参数 为了防止队列过度扩展;

    capacity如果未指定,默认是 Integer#MAX_VALUE;

    node在每次insert操作时动态链接;

  

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {

        static class Node<E> {
            E item;
            Node<E> next;
            Node(E x) { item = x; }
        }

        /** The capacity bound, or Integer.MAX_VALUE if none */
        private final int capacity;

        transient Node<E> head;
        private transient Node<E> last;

        public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }

        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }

    }

  

链路

offer(E e) 

// java.util.concurrent.LinkedBlockingQueue.offer(E)
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

    // java.util.concurrent.LinkedBlockingQueue.enqueue
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

  

poll()

// java.util.concurrent.LinkedBlockingQueue.poll()
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    // java.util.concurrent.LinkedBlockingQueue.dequeue
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }