LinkedBlockingQueue源码分析

发布时间 2023-05-17 17:11:56作者: 无虑的小猪

1、LinkedBlockingQueue使用

  LinkedBlockingQueue的使用案例详情如下:

 1 import java.util.concurrent.LinkedBlockingDeque;
 2 import java.util.concurrent.LinkedBlockingQueue;
 3 
 4 public class TestLinkedBlockingQueue {
 5     public static void main(String[] args) throws Exception {
 6         LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(1);
 7         // poll、peek、take  获取元素
 8         queue.add("123");
 9         // peek  获取队列头元素,不移除元素
10         String peek = queue.peek();
11         System.out.println(peek);
12         // poll  获取队列头元素,移除元素
13         String poll = queue.poll();
14         System.out.println(poll);
15 //        queue.add("234");
16 
17         // take  获取队列头元素,移除元素,队列为空,阻塞
18 //        String take = queue.take();
19 //        System.out.println(take);
20 
21 
22         // add、offer、put  添加元素
23         // add 添加元素,队列满了,抛出异常
24         boolean add01 = queue.add("1");
25         System.out.println("add01 ==> " + add01);
26         //  add 添加元素,队列满了,返回false
27         boolean offer = queue.offer("2");
28         System.out.println("offer ==> " + offer);
29         try {
30             boolean add02 = queue.add("1");
31             System.out.println("add02 ==> " + add02);
32         }catch (Exception e) {
33             e.printStackTrace();
34         }
35 
36         queue.clear();
37         // put 添加元素,队列满了,阻塞等待
38         queue.put("3");
39         System.out.println("put01 ==> " + queue.peek());
40         queue.put("4");
41     }
42 }

2、LinkedBlockingQueue继承体系  

  

  与ArrayBlockingQueue一样,LinkedBlockingQueue实现了Queue、BlockingQueue接口,继承AbstractCollection类。相关父类分析可参考ArrayBlockingQueue源码分析中介绍,此处不再赘述。

3、LinkedBlockingQueue源码分析

  LinkedBlockingQueue的成员变量:

 1 // 队列的容量,若不指定,默认 Integer.MAX_VALUE
 2 private final int capacity;
 3 // 队列中元素的数量
 4 private final AtomicInteger count = new AtomicInteger();
 5 // 队列头节点
 6 transient Node<E> head;
 7 // 队列尾节点
 8 private transient Node<E> last;
 9 
10 // 读锁   take() 等获取元素用到的可重入排他锁
11 private final ReentrantLock takeLock = new ReentrantLock();
12 
13 // 阻塞获取数据,用于线程挂起、线程唤醒
14 private final Condition notEmpty = takeLock.newCondition();
15 
16 // put() 等添加元素用到的可重入排它锁
17 private final ReentrantLock putLock = new ReentrantLock();
18 
19 // 写锁  阻塞添加数据,用于线程挂起、线程唤醒
20 private final Condition notFull = putLock.newCondition();

  LinkedBlockingQueue的链表节点静态类:

1 static class Node<E> {
2     // 元素
3     E item;
4     // 指向下一节点的Node
5     Node<E> next;
6     // 构造函数
7     Node(E x) { item = x; }
8 }

   LinkedBlockingQueue基于Node节点实现元素的存储,通过next属性将添加的元素构成一个单向链表,在成员变量中声明了链表的头节点head、链表的尾节点last。

3.1、添加元素

  LinkedBlockingQueue#enqueue(node) 详情如下:
1 // 元素添加进队列
2 private void enqueue(Node<E> node) {
3     // 将node添加进队列 - 单向链表的尾部
4     last = last.next = node;
5 }

3.1.1、offer(e) - 添加元素

  LinkedBlockingQueue#offer(e) 详情如下:

 1 // 添加元素至队列尾
 2 public boolean offer(E e) {
 3     // 添加元素为null,抛出异常 
 4     if (e == null) throw new NullPointerException();
 5     // 获取当前队列中元素总数
 6     final AtomicInteger count = this.count;
 7     // 队列已满,添加元素失败,返回1false
 8     if (count.get() == capacity)
 9         return false;
10     // 添加元素e前,队列中的元素总数
11     int c = -1;
12     // 创建Node节点,封装添加的e元素
13     Node<E> node = new Node<E>(e);
14     // 获取锁对象
15     final ReentrantLock putLock = this.putLock;
16     // 加锁
17     putLock.lock();
18     try {
19         // 队列元素总数 小于 队列容量 
20         if (count.get() < capacity) {
21             // 元素添加进队列中
22             enqueue(node);
23             // 队列元素总数 + 1
24             c = count.getAndIncrement();
25             // 当前元素添加进队列后,队列未满
26             if (c + 1 < capacity)
27                 // 唤醒其他添加元素线程
28                 notFull.signal();
29         }
30     } finally {
31         // 释放锁
32         putLock.unlock();
33     }
34     // c = count.getAndIncrement() => 先获取队列中的元素总数,若之前队列为空,即count = 0
35     // 添加元素后,队列元素总数为1,此时需要唤醒take阻塞获取元素的线程
36     if (c == 0)
37         signalNotEmpty();
38     // 返回添加结果
39     return c >= 0;
40 }

  offer(e)在队列尾部添加元素,若队列中的数组元素已满,返回false;否则将元素添加进队列中,返回false。

  若添加元素后队列未满,唤醒其他添加元素的线程(put);

  若添加元素前,队列为空,在元素添加成功后,唤醒获取元素的线程(take)。

  与ArrayBlockingQueue基于数组添加不同的是,LinkedBlockingQueue基于链表添加元素。

3.1.2、offer(e, timeout, unit)

  LinkedBlockingQueue#offer(e, timeout, unit) 详情如下:

 1 // 添加元素至队列尾
 2 public boolean offer(E e, long timeout, TimeUnit unit)
 3     throws InterruptedException {
 4     // 添加元素为null,抛出异常 
 5     if (e == null) throw new NullPointerException();
 6     // 阻塞时间转化为纳秒
 7     long nanos = unit.toNanos(timeout);
 8     // 添加元素e前,队列中的元素总数
 9     int c = -1;
10     // 获取锁对象
11     final ReentrantLock putLock = this.putLock;
12     // 获取当前队列中元素总数
13     final AtomicInteger count = this.count;
14     // 加锁
15     putLock.lockInterruptibly();
16     try {
17         // 队列已满
18         while (count.get() == capacity) {
19             // 阻塞时间已达到
20             if (nanos <= 0)
21                 // 返回false
22                 return false;
23             // 阻塞时间未达到,继续阻塞,并返回阻塞的剩余时间
24             nanos = notFull.awaitNanos(nanos);
25         }
26         // 添加进队列中
27         enqueue(new Node<E>(e));
28         // 获取添加元素前的总数 c 并获取添加元素后的总数 count + 1
29         c = count.getAndIncrement();
30         // 当前元素添加进队列后,队列未满
31         if (c + 1 < capacity)
32             // 唤醒其他添加元素线程
33             notFull.signal();
34     } finally {
35         // 释放锁资源
36         putLock.unlock();
37     }
38     // c = count.getAndIncrement() => 先获取队列中的元素总数,若之前队列为空,即count = 0
39     // 添加元素后,队列元素总数为1,此时需要唤醒take阻塞获取元素的线程
40     if (c == 0)
41         signalNotEmpty();
42     // 添加成功返回false
43     return true;
44 } 

  offer(e, timeout, unit)在队列尾部添加元素,若队列中的数组元素已满,阻塞timeout时间,若阻塞时间已达到,仍未添加成功,返回false。

  阻塞时间未到达添加成功,若添加元素后队列未满,唤醒其他添加元素的线程(put);

  若添加元素前,队列为空,在元素添加成功后,唤醒获取元素的线程(take)。

3.1.3、put(e) - 添加元素

  LinkedBlockingQueue#put(e) 详情如下:

 1 // 添加元素至队列尾
 2 public void put(E e) throws InterruptedException {
 3     // 添加元素为null,抛出异常 
 4     if (e == null) throw new NullPointerException();
 5     // 添加元素e前,队列中的元素总数
 6     int c = -1;
 7     // 创建Node节点,封装添加的e元素
 8     Node<E> node = new Node<E>(e);
 9      // 获取锁对象
10     final ReentrantLock putLock = this.putLock;
11     // 获取当前队列中元素总数
12     final AtomicInteger count = this.count;
13     // 加锁
14     putLock.lockInterruptibly();
15     try {
16         // 当前队列已满,put操作挂起线程
17         while (count.get() == capacity) {
18             notFull.await();
19         }
20         // 元素添加进队列
21         enqueue(node);
22         // 获取添加元素前的总数 c 并获取添加元素后的总数 count + 1
23         c = count.getAndIncrement();
24         // 添加元素后,队列未满,唤醒put添加元素线程操作
25         if (c + 1 < capacity)
26             notFull.signal();
27     } finally {
28         // 释放锁资源
29         putLock.unlock();
30     }
31     // c = count.getAndIncrement() => 先获取队列中的元素总数,若之前队列为空,即count = 0
32     // 添加元素后,队列元素总数为1,此时需要唤醒take阻塞获取元素的线程
33     if (c == 0)
34         signalNotEmpty();
35 }

  put(e)阻塞添加元素至队列尾部,若当前队列已满,当前put线程挂起。当获取元素线程从队列中拿走元素,队列中有可取空间时,唤醒挂起的put线程,将元素添加进队列中。

  若添加元素后队列未满,唤醒其他添加元素的线程(put);

  若添加元素前,队列为空,在元素添加成功后,唤醒获取元素的线程(take)。

3.1.4、总结

方法

不同点

offer(e)

队列容量已满,添加失败,直接返回false

offer(e,timeout,unit)

队列容量已满,阻塞等待timeout时间,若阻塞时间到达,仍添加失败,直接返回false

put(e)

队列容量已满,阻塞等待直到队列中有可添加的空间

 

3.2、获取元素

  出队流程如下:

  

   LinkedBlockingQueue#dequeue() 详情如下:

 1 // 从队列头元素中移除并返回元素
 2 private E dequeue() {
 3     // 获取队列头节点
 4     Node<E> h = head;
 5     // 获取队列头节点的next
 6     Node<E> first = h.next;
 7     // 将当前队列头节点的next作为新的头节点,原头节点通过GC回收
 8     h.next = h;
 9     head = first;
10     // 获取并记录新头节点的元素值
11     E x = first.item;
12     // 当前节点的元素值设置为null
13     first.item = null;
14     // 返回当前节点的元素值
15     return x;
16 }

3.2.1、peek() - 获取元素

  LinkedBlockingQueue#peek() 详情如下:

 1 // 获取队列头节点
 2 public E peek() {
 3     // 队列中无可取的元素
 4     if (count.get() == 0)
 5         // 返回null
 6         return null;
 7     // 获取锁对象
 8     final ReentrantLock takeLock = this.takeLock;
 9     // 加锁
10     takeLock.lock();
11     try {
12         // 获取队列元素头节点
13         Node<E> first = head.next;
14         // 头节点为null,返回null
15         if (first == null)
16             return null;
17         // 头节点不为null,返回节点中的元素值
18         else
19             return first.item;
20     } finally {
21         // 释放锁
22         takeLock.unlock();
23     }
24 }

  peek()获取队列中的头元素,若队列中无可取的元素,返回null;获取head的next节点,若该节点为null,则返回null,否则返回节点中的item值。

3.2.2、poll() - 获取元素

  LinkedBlockingQueue#poll() 详情如下:

 1 // 获取队列头节点
 2 public E poll() {
 3     // 获取队列中的元素总数
 4     final AtomicInteger count = this.count;
 5     // 队列中无可取的元素,返回null
 6     if (count.get() == 0)
 7         return null;
 8     // 返回元素变量
 9     E x = null;
10     // 本次获取元素操作前,队列中元素的总数
11     int c = -1;
12     // 获取 take 的锁对象
13     final ReentrantLock takeLock = this.takeLock;
14     // 加锁
15     takeLock.lock();
16     try {
17         // 队列中有可取元素
18         if (count.get() > 0) {
19             // 获取队列头元素
20             x = dequeue();
21             // 得到本次获取元素前的总数 c 并对获取元素后的总数 count - 1
22             c = count.getAndDecrement();
23             // 本次获取元素前,队列非空,至少有两个可取元素
24             if (c > 1)
25                 // 唤醒其他获取队列元素的线程
26                 notEmpty.signal();
27         }
28     } finally {
29         // 释放锁资源
30         takeLock.unlock();
31     }
32     // 本次获取元素前,队列已满,本次获取元素后,当前count = capacity - 1,
33     // 队列中有可用空间,唤醒put线程添加元素
34     if (c == capacity)
35         signalNotFull();
36     // 返回元素值
37     return x;
38 }

  poll()获取队列中的头元素,若队列中无可取的元素,返回null;若队列中有多个可取元素,获取队列头节点并唤醒其他获取元素线程,若在本次获取元素前,队列已满,本次获取元素后,当前count = capacity - 1,队列中有可用空间,唤醒put线程添加元素。

3.2.3、poll(timeout, unit)

  LinkedBlockingQueue#poll(timeout, unit) 详情如下:

 1 // 获取队列头节点(阻塞指定时间)
 2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 3     // 返回元素变量
 4     E x = null;
 5     // 本次获取元素操作前,队列中元素的总数
 6     int c = -1;
 7     // 转换为纳秒
 8     long nanos = unit.toNanos(timeout);
 9     // 当前队列中元素总数
10     final AtomicInteger count = this.count;
11     // 获取锁对象
12     final ReentrantLock takeLock = this.takeLock;
13     // 加锁
14     takeLock.lockInterruptibly();
15     try {
16         // 当前队列可取元素为空
17         while (count.get() == 0) {
18             // 阻塞时间已达到
19             if (nanos <= 0)
20                 // 返回null
21                 return null;
22             // 阻塞时间未达到,继续阻塞,并返回剩余阻塞时间
23             nanos = notEmpty.awaitNanos(nanos);
24         }
25         // 获取队列头元素
26         x = dequeue();
27         // 得到本次获取元素前的总数 c 并对获取元素后的总数 count - 1
28         c = count.getAndDecrement();
29         // 本次获取元素前,队列非空,至少有两个可取元素
30         if (c > 1)
31             // 唤醒其他获取队列元素的线程
32             notEmpty.signal();
33     } finally {
34         // 释放锁资源
35         takeLock.unlock();
36     }
37     // 本次获取元素前,队列已满,本次获取元素后,当前count = capacity - 1,
38     // 队列中有可用空间,唤醒put线程添加元素
39     if (c == capacity)
40         signalNotFull();
41     // 返回元素值
42     return x;
43 }

  poll(timeout, unit)获取队列中的头元素,若队列中无可取的元素,阻塞timeout时间,阻塞时间已达到仍未获取到头元素,返回null;若获取到队列头元素,并且队列中有多个可取元素,获取队列头节点并唤醒其他获取元素线程(poll、take),若在本次获取元素前,队列已满,本次获取元素后,当前count = capacity - 1,队列中有可用空间,唤醒put线程添加元素。

3.2.4、take() - 阻塞获取元素

  LinkedBlockingQueue#take() 详情如下:

 1 // 获取队列头节点(阻塞)
 2 public E take() throws InterruptedException {
 3     // 返回元素变量
 4     E x;
 5     // 本次获取元素操作前,队列中元素的总数
 6     int c = -1;
 7     // 当前队列中元素总数
 8     final AtomicInteger count = this.count;
 9     // 获取锁对象
10     final ReentrantLock takeLock = this.takeLock;
11     // 加锁
12     takeLock.lockInterruptibly();
13     try {
14         // 当前队列可取元素为空,挂起线程,待put、offer添加元素成功后,唤醒线程
15         while (count.get() == 0) {
16             notEmpty.await();
17         }
18         // 获取队列头元素
19         x = dequeue();
20        // 得到本次获取元素前的总数 c 并对获取元素后的总数 count - 1
21         c = count.getAndDecrement();
22         // 本次获取元素前,队列非空,至少有两个可取元素
23         if (c > 1)
24             // 唤醒其他获取队列元素的线程
25             notEmpty.signal();
26     } finally {
27         // 释放锁
28         takeLock.unlock();
29     }
30     // 本次获取元素前,队列已满,本次获取元素后,当前count = capacity - 1,
31     // 队列中有可用空间,唤醒put线程添加元素
32     if (c == capacity)
33         signalNotFull();
34     // 返回元素值
35     return x;
36 }

  take()获取队列中的头元素,若队列中无可取的元素,挂起线程,等到put/offer添加元素成功后唤醒put线程;获取队列头元素,若队列中有多个可取元素,获取队列头节点并唤醒其他获取元素线程,若在本次获取元素前,队列已满,本次获取元素后,当前count = capacity - 1,队列中有可用空间,唤醒put线程添加元素。

3.2.5、总结

方法
不同点
peek()
队列无可取元素,获取失败,直接返回null
poll()
队列无可取元素,直接返回null
poll(timeout,unit)
队列无可取元素,阻塞等待timeout时间,若阻塞时间到达,仍获取失败,直接返回null
take()
队列无可取元素,阻塞等待直到队列中有可取元素

3.3、删除元素

  元素移除队列,LinkedBlockingQueue#remove() 详情如下:

 1 // 删除元素
 2 public boolean remove(Object o) {
 3     // 删除元素为null,返回false
 4     if (o == null) return false;
 5     // 加锁,写锁/读锁
 6     fullyLock();
 7     try {
 8         // 遍历单向链表
 9         for (Node<E> trail = head, p = trail.next;
10              p != null;
11              trail = p, p = p.next) {
12              // 匹配到链表中的元素
13             if (o.equals(p.item)) {
14                 // 将当前Node从链表中移除
15                 unlink(p, trail);
16                 // 返回移除成功标识
17                 return true;
18             }
19         }
20         // 返回移除成功标识
21         return false;
22     } finally {
23         fullyUnlock();
24     }
25 }
26 
27 // 加锁 写锁、读锁
28 void fullyLock() {
29     putLock.lock();
30     takeLock.lock();
31 }
32 
33 // 释放锁 写锁、读锁
34 void fullyUnlock() {
35     takeLock.unlock();
36     putLock.unlock();
37 }

  Node移除链表,LinkedBlockingQueue#unlink() 详情如下:

 1 // 移除Node
 2 void unlink(Node<E> p, Node<E> trail) {
 3     // 当前Node的item设置为null
 4     p.item = null;
 5     // 将当前Node的上一节点prev指向当前Node的上一节点next
 6     trail.next = p.next;
 7     // 若p是尾节点,则当前Node的prev设置为尾节点
 8     if (last == p)
 9         last = trail;
10     // 如果当前队列满,删除后,也不忘记唤醒等待的线程
11     if (count.getAndDecrement() == capacity)
12         notFull.signal();
13 }

4、总结

  LinkedBlockingQueue基于单向链表实现元素存取,链表中的节点用Node表示,Node中包含当前元素值及指向下一节点的指向next。LinkedBlockingQueue内部维护Atomic原子类count成员变量,记录当前数组中的元素数量,维护当前队列头节点head、尾节点last。LinkedBlockingQueue内部提供了读锁和写锁,读写不互斥。LinkedBlockingQueue数据结构详情如下:

0

  添加元素时,优先判断count值是否等于队列容量,即队列是否满了,若队列满了,调用不同的添加方法有不同的结果。若在添加元素前队列为空,在添加元素后,唤醒(take)获取元素线程;

  获取元素时,优先判断count值是否等于0,即队列是否为空,对队列为空,直接返回null;poll/take在获取元素前,队列中有多个元素,唤醒(take)获取线程,若在获取队列前队列已满,在获取元素后,队列中有可用空间,唤醒(put)添加元素线程。

  LinkedBlockingQueue阻塞添加、获取队列元素是基于AQS中的ConditionObject实现的。