CyclicBarrier源码分析

发布时间 2023-05-06 09:18:22作者: 无虑的小猪

1、CyclicBarrier的介绍

  CyclicBarrier 被称为栅栏,允许一组线程相互等待,直到这一组线程都准备完毕,放行,程序方可继续执行。

  就好像做摩天轮,游乐园规定,至少有9个游客乘坐摩天轮,管理员才可以启动摩天轮,游客数和管理员少一个条件,摩天轮都不会启动。

2、CyclicBarrier的使用

  根据上面摩天轮的案例,程序代码如下:

 1 import java.util.concurrent.BrokenBarrierException;
 2 import java.util.concurrent.CyclicBarrier;
 3 
 4 public class TestCyclicBarrier {
 5 
 6     public static void main(String[] args) throws Exception {
 7         int parties = 10;
 8         CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
 9             System.out.println("======== 启动摩天轮.... ========");
10         });
11 
12         for (int i = 1; i <= parties - 1; i++) {
13             final int current = i;
14             new Thread(() -> {
15                 System.out.println("编号为 " + current + " 的游客已准备就绪");
16                 try {
17                     if (current < 9) {
18                         System.out.println("sorry,游客数不足,无法启动摩天轮");
19                     } else {
20                         System.out.println("OK,游客数已达标,准备启动摩天轮");
21                     }
22                     barrier.await();
23 
24                     System.out.println("编号为 " + current + " 的游客尖叫");
25                 } catch (InterruptedException e) {
26                     e.printStackTrace();
27                 } catch (BrokenBarrierException e) {
28                     e.printStackTrace();
29                 }
30             }).start();
31         }
32 
33         System.out.println("管理员已到位");
34         barrier.await();
35 
36     }
37 }

  执行结果如下:

  

3、CyclicBarrier的源码分析

  与CountDownLatch、Semaphore直接基于AQS实现不同,CyclicBarrier 是基于 ReentrantLock + ConditionObject 实现的,间接基于AQS实现的。有关ConditionObject的分析,参考:

ConditionObject源码分析

3.1、CyclicBarrier概览

  Generation,静态内部类,持有布尔类型的属性broken,默认为false,只有在重置方法reset()、执行出现异常或中断调用breakBarrier() ,属性会被设置为true。

  nextGenerate() 重置 CyclicBarrier 的计数器和generation属性。

  breakBarrier() 任务执行中断、异常、被重置,将Generation中的布尔类型属性设置为true,将Waiter队列中的线程转移到AQS队列中,待执行完unlock方法后,唤醒AQS队列中的挂起线程。

  await() :CyclicBarrier的核心方法,计数器递减处理。

  

3.2、构造函数

  构造参数重载,最终调用的是CyclicBarrier(int, Runnable),详情如下:

 1 public CyclicBarrier(int parties) {
 2     this(parties, null);
 3 }
 4 
 5 public CyclicBarrier(int parties, Runnable barrierAction) {
 6     // 参数合法性校验
 7     if (parties <= 0) throw new IllegalArgumentException();
 8     // final修饰,所有线程执行完成归为或重置时 使用
 9     this.parties = parties;
10     // 在await方法中计数值,表示还有多少线程待执行await
11     this.count = parties;
12     // 当计数count为0时 ,执行此Runnnable,再唤醒被阻塞的线程
13     this.barrierCommand = barrierAction;
14 }

3.3、CyclicBarrier属性

  

3.4、核心方法分析

1、await() - 源码分析

  在CyclicBarrier中,await有重载方法。await()表示会一直等待指定数量的线程未准备就绪(执行await方法);await(timout, unit)表示等待timeout时间后,指定数量的线程未准备就绪,抛出TimeoutException超时异常。

CyclicBarrier#await 详情如下:

 1 // 执行没有超时时间的await
 2 public int await() throws InterruptedException, BrokenBarrierException {
 3     try {
 4         // 执行dowait()
 5         return dowait(false, 0L);
 6     } catch (TimeoutException toe) {
 7         throw new Error(toe); 
 8     }
 9 }
10 
11 // 执行有超时时间的await
12 public int await(long timeout, TimeUnit unit)
13     throws InterruptedException,
14            BrokenBarrierException,
15            TimeoutException {
16     return dowait(true, unit.toNanos(timeout));
17 }

  await最终调用dowait()方法,CyclicBarrier#dowait 详情如下:

 1 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
 2     // 获取锁对象
 3     final ReentrantLock lock = this.lock;
 4     // 加锁
 5     lock.lock();
 6     try {
 7         // 获取generation对象
 8         final Generation g = generation;
 9         
10         // 这组线程中在执行过程中是否异常、超时、中断、重置
11         if (g.broken)
12             throw new BrokenBarrierException();
13         
14         // 这组线程被中断,重置标识与计数值,
15         //     将Waiter队列中的线程转移到AQS队列,抛出InterruptedException
16         if (Thread.interrupted()) {
17             breakBarrier();
18             throw new InterruptedException();
19         }
20         
21         // 计数值 - 1
22         int index = --count;
23         // 这组线程都已准备就绪
24         if (index == 0) { 
25             // 执行结果标识
26             boolean ranAction = false;
27             try {
28                 // 若使用2个参数的有参构造,就传入了自实现任务,index == 0,先执行CyclicBarrier有参的任务
29                 //     此处设计与 FutureTask 构造参数设计类似
30                 final Runnable command = barrierCommand;
31                 if (command != null)
32                     // 执行任务
33                     command.run();
34                 // 执行完成,设置为true
35                 ranAction = true;
36                 // CyclicBarrier属性归位
37                 nextGeneration();
38                 return 0;
39             } finally {
40                 // 执行过程中出现问题
41                 if (!ranAction)
42                     // 重置标识与计数值,将Waiter队列中的线程转移到AQS队列
43                     breakBarrier();
44             }
45         }
46 
47         // -- 之后,count不为0,表示还有线程在等待
48         // 自旋 直到被中断、超时、异常、count = 0
49         for (;;) {
50             try {
51                 // 未设置超时时间
52                 if (!timed)
53                     // 挂起线程,将线程转移到 Condition 队列
54                     trip.await();
55                 // 未达到等待时间
56                 else if (nanos > 0L)
57                     // 挂起线程,并返回剩余等待时间
58                     nanos = trip.awaitNanos(nanos);
59             } catch (InterruptedException ie) {
60                 // 中断异常
61                 if (g == generation && ! g.broken) {
62                     breakBarrier();
63                     throw ie;
64                 } else {
65                     // 线程中断
66                     Thread.currentThread().interrupt();
67                 }
68             }
69             
70             // 该组线程被中断、执行异常、超时,抛出BrokenBarrierException异常
71             if (g.broken)
72                 throw new BrokenBarrierException();
73             
74             if (g != generation)
75                 return index;
76             
77             // 超时,抛出异常TimeoutException
78             if (timed && nanos <= 0L) {
79                 breakBarrier();
80                 throw new TimeoutException();
81             }
82         }
83     } finally {
84         // 释放锁资源
85         lock.unlock();
86     }
87 }

2、breakBarrier() - 结束CyclicBarrier的执行

 1 // 结束CyclicBarrier的执行
 2 private void breakBarrier() {
 3     // 设置线程执行过程中是否异常、中断、重置标识
 4     generation.broken = true;
 5     // 重置计数值
 6     count = parties;
 7     // 将Condition队列中的Node转移到AQS队列中,等到执行完unlock,AQS队列中的挂起线程会被唤醒
 8     // 有后继节点的,设置ws = -1;
 9     // 无后继节点的,设置ws = 0
10     trip.signalAll();
11 }

3、reset() - 重置CyclicBarrier

 1 // 重置CyclicBarrier
 2 public void reset() {
 3     // 获取锁对象
 4     final ReentrantLock lock = this.lock;
 5     // 加锁
 6     lock.lock();
 7     try {
 8         // 设置当前generation属性,并将Waiter队列中线程转移到AQS队列
 9         breakBarrier();  
10         // 重置generation 属性、计数值
11         nextGeneration();
12     } finally {
13         // 释放锁
14         lock.unlock();
15     }
16 }

4、nextGeneration() - CyclicBarrier归位

  与reset()不同,nextGeneration()是在任务执行完成后,对 CyclicBarrier 做归位,不会设置线程执行异常、超时、中断标识。

1 private void nextGeneration() {
2     // 将Waiter队列中线程转移到AQS队列
3     trip.signalAll();
4     // 计数值、generation 归位
5     count = parties;
6     generation = new Generation();
7 }

4、总结

  CyclicBarrier基于 ReentrantLock + ConditionObject实现,CyclicBarrier的构造函数中必须指定parties,同时对象generation,内部持有布尔型属性表示当前CyclicBarrier执行过程中是否有超时、异常、中断的情况。

  parties是初始待执行线程数,在构造函数中会将parties赋给计数值count,每当一个线程执行await(),count就会减1。

  当count被减为0时,代表所有线程都准备就绪,此时判断构造函数是否初始化了barrierCommand属性,若对barrierCommand属性做了赋值,优先执行barrierCommand任务;

  barrierCommand任务执行完成,再将Waiter队列中的线程转移到AQS队列中,执行完unlock,唤醒AQS队列中的线程;计数值count、generation归位。