【Java 并发】【九】【AQS】【六】CyclicBarrier栅栏底层机制原理

发布时间 2023-04-07 08:39:03作者: 酷酷-

1  前言

接下来我们来看看CyclicBarrier,也是基于之前讲解的AQS来实现的,建立在AQS体系之上的一个并发工具类。

2  CyclicBarrier是什么

CyclicBarrier,就是一个计数器栅栏,也就是一个计数器开关。
比如CyclicBarrier barrier = new CyclicBarrier(3),最开始的时候这个栅栏上有3道锁,每次调用barrier.await方法的时候就会减少栅栏上面的一道锁,调用3次await方法的时候就相当于3道锁都清除了,这个时候栅栏就会打开了。如果线程调用了await方法之后,栅栏上面还有锁,那么这个线程就得在那里等着了,等别的哥们也调用await方法清除栅栏上面的锁,直到栅栏上面的所有锁都清除了之后栅栏才会打开。
我们还是写个例子,感受下:

public class CyclicBarrierTest {
    // 声明一个有3道锁的栅栏
    public static CyclicBarrier barrier = new CyclicBarrier(3);
    // 等待通过栅栏的线程
    public static class WaiterThread extends Thread {
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + "到了,先等着");
                barrier.await();
                System.out.println(Thread.currentThread().getName() + "放行了");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 创建3个等待通过栅栏的线程
        WaiterThread waiter1 = new WaiterThread();
        WaiterThread waiter2 = new WaiterThread();
        WaiterThread waiter3 = new WaiterThread();

        waiter1.start();
        waiter2.start();
        waiter3.start();

        waiter1.join();
        waiter2.join();
        waiter3.join();
        System.out.println("运行结束");
    }
}

只有三个线程都调用了await方法之后,也就是栅栏上面的锁都打开了之后,才放行。
我们上节说CountDownLatch也是类似计数器,那么两者有什么区别呢?
CountDownLatch是一个一次性的开关,相当于门闩上面的所有锁都打开了之后就放行所有人,只要这个门闩打开了就不会再次关上了,也就是门一直是打开的;然而CyclicBarrier就不是了,比如CyclicBarrier barrier = new CyclicBarrier(3); 表示栅栏上面有3道锁。3道锁都被打开了之后,barrier栅栏会放行一次,然后一次放行会放3个人过去,一旦放行了3个人过去之后,这个栅栏又关闭了,栅栏上面又恢复了3道锁的状态,周而复始......
我们画个图来理解下:

CyclicBarrier的大概流程就如上面图所示,假如CyclicBarrier barrier = new CyclicBarrier(3),创建一个有3道锁的栅栏。
(1)线程1最开始调用await方法减少栅栏的一道锁,但是此时栅栏上面还有2道锁,栅栏还是关闭状态,此时线程1进入等待。
(2)同理线程2调用await方法也一样,当线程3调用await方法的时候刚好把栅栏的最后一道锁解开了,此时栅栏打开了,线程3就会叫醒沉睡的线程1、线程2,让他们也一起通过。
(3)这3个线程通过了之后,栅栏重新关闭,恢复3道锁的状态,一直循环往复

3  CyclicBarrier源码分析

3.1  CyclicBarrier重要属性

先来看一下CyclicBarrier有哪些属性:

public class CyclicBarrier {
    // 内部类,批次
    private static class Generation {
        boolean broken = false;
    }
    // 注意:这里有一个ReentrantLock,说明CyclicBarrier的是基于ReentrantLock实现的
    private final ReentrantLock lock = new ReentrantLock();
    // 这里有一个condition trip,说明CyclicBarrier的实现依赖Condition
    private final Condition trip = lock.newCondition();
    // 这里就是栅栏上面有多少道锁,初始化栅栏是多少,这里的parties就是多少
    private final int parties;
    // 这里有个任务,就是栅栏开启的时候,如果这个任务不是null,则会执行这个任务
    private final Runnable barrierCommand;
    // 批次,栅栏再次关闭之后会进入下一个批次
    private Generation generation = new Generation();
    // 栅栏上面还有多少道锁,比如最开始有3道锁,现在只有1道锁,这时count=1
    private int count;
}

先说一下上面的关键属性:

lock:CyclicBarrier内部有一个ReentrantLock,调用await方法的时候需要去获取锁。
trip:这里是ReentrantLock的Condition,主要用来实现await和signal效果的,之前我们讲解Condition的底层原理的时候讲解过了,不懂的小伙伴,需要回去看一下哦。
parties:这个就是你设置栅栏上面有多少道锁了,创建的时候是多少就是多少道锁,CyclicBarrier barrier = new CyclicBarrier(3),那么这里的parties = 3;
count:这里就是当前栅栏上面还剩余多少道锁,比如parties = 3,有两个线程调用了barrier.await方法,那么锁减少了2道,那么此时count = 1,说明还剩余1道锁。
generation:这里是实现CyclicBarrier的关键,叫做批次。每次CyclicBarrier打开之后,都会换一个批次,这样CyclicBarrier就可以复用了
barrierCommand:任务,这个属性可以传也可以不传。每次栅栏打开的时候会检查这个属性是不是空,如果不是栅栏打开的时候会运行这个任务。

3.2  CyclicBarrier构造方法

看了CyclicBarrier的关键属性,我们再看一下CyclicBarrier的构造方法:

public CyclicBarrier(int parties) {
    // 这里传递的barrierAction为null,说明栅栏打开的时候没有任务需要做
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // 这里传递栅栏初始化的时候有多少道锁
    this.parties = parties;
    // 最开始剩余锁数量 等于 初始化锁数量
    this.count = parties;
    // 每次栅栏打开,都会触发一次的任务,可以不传
    this.barrierCommand = barrierAction;
}

接下来看一下CyclicBarrier的核心方法await的源码。

3.3  await方法源码

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 这里调用了内部的dowait方法
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

3.3.1  dowait方法源码

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    // 看这里,非常关键,进来第一步,首先是获取一个锁
    final ReentrantLock lock = this.lock;
    // 获取锁,获取成功之后才能往后走
    lock.lock();
    try {
        // 获取当前自己属于哪个批次
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // 看到这里,就是将剩余锁数量减少1道,即--count
        int index = --count;
        // 如果锁剩余数量为0,说明栅栏打开了
        if (index == 0) { // tripped
            boolean ranAction = false;
            try {
                // 传入的任务是不是null,如果不是空运行一下任务
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 到这里,就是换一个批次,会唤醒沉睡队列的线程
                // 这里会调用condition.signalAll唤醒沉睡队列的线程
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 走到这里,说明上面的锁剩余数量index > 0
        for (;;) {
            try {
                if (!timed)
                    // 看这里,发现锁数量大于0,直接调用condition的await方法沉睡了
                    // 等待栅栏打开的时候调用condition.signalAll将它唤醒
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();
            // 走到这里,发现自己属于上一个批次,跟当前批次generation不等
            // 直接就返回index了
            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}

我们拿个例子边画图边分析一下运行过程,方便理解:
假如说CyclicBarrier barrier = new CyclicBarrier(3),线程1、线程2、线程3分别调用barrier.await方法,则流程如下:
(1)线程1最开始调用CyclicBarrier的await方法,执行流程如下图:

首先第一步是执行ReentrantLock的lock方法获取锁,然后将栅栏剩余锁的数量减少1,即 --count。
发现栅栏上剩余锁的数量count为2,说明栅栏没打开,则调用Condition的await方法,释放了ReentrantLock锁,进入沉睡队列等待,等待别人在栅栏打开的时候将它唤醒。

(2)我们再来看一下此时线程2调用CyclicBarrier的await方法的流程图:

线程2的执行流程跟上面的线程1是一样的,这里就不再多说了。
线程2调用CyclicBarrier的await方法之后,此时栅栏上剩余的锁数量count = 1,此时沉睡队列里面有两个线程(线程1、线程2)

(3)然后我们再继续看一下,线程3调用CyclicBarrier的await方法之后的流程图:

线程3调用CyclicBarrier的await方法之后,发现count减少为0了,此时栅栏打开了
然后查看barrierCommand栅栏打开之后的任务是否为null,如果不为null,则运行一下任务
然后调用nextGeneration方法,源码如下:

private void nextGeneration() {
    // 唤醒上面因为调用condition.await方法而进入沉睡队列的线程1、线程2
    trip.signalAll();
    // 重新设置栅栏上锁的数量count = parties
    count = parties;
    // 进入下一个批次了
    generation = new Generation();
}

调用trip.signalAll()即Condtion.signalAll方法唤醒,因为调用Condition.await方法而进入沉睡队列的线程1、线程2
然后重新设置一下栅栏上的锁数量count = parties;
然后生成下一个批次,generation = new Generation()

上面就是CyclicBarrier的await方法的核心源码流程了,CyclicBarrier的最核心的方法就是await方法了。

4  小结

其实就是整个可复用的计数器开关,比如最开始设置开关数量为3,到0的时候就打开开关。为了并发安全考虑,所以需要用到锁,同时为了实现阻塞唤醒的用到了Condtion。
每次调用await方法,先获取锁,然后计数器减少1,看看计数器是否为0,如果不是0,则释放锁,进入沉睡队列等待一下。如果是0了,则唤醒一下之前睡着的哥们,说开关打开了,我们一起通过吧。
为了实现复用的效果,这里会将计数器count进行复位,以及创建新的批次开启新一代,有理解不对的地方欢迎指正哈。