【Java 并发】【九】【AQS】【五】CountDownLatch计数器底层机制原理

发布时间 2023-04-07 07:42:50作者: 酷酷-

1  前言

接下来我们来看看CountDownLatch,也是基于之前讲解的AQS来实现的,建立在AQS体系之上的一个并发工具类。

2  CountDownLatch是什么

CountDownLatch类似一个有多道锁的门闩,CountDownLatch在创建的时候就指定好有多少道锁链了。
假如有个门闩 CountDownLatch latch = new CountDownLatch(5),则这个门闩上面有5道锁链,当latch == 0 的时候说明门闩上5道锁都被解开了,这时候门闩打开了。
当线程调用latch.await方法的时候,会去检查latch内锁的数量是否等于0,也就是门闩是否打开了。
如果等于0说明门闩打开了,则不会被阻塞调用线程,直接运行后面的逻辑;如果 latch > 0 比如latch == 2 说明门闩上面还有2道锁,没打开,这个时候调用latch.await方法的线程就会被阻塞。
每次调用latch.countDown方法的时候,就会去掉一道锁,所以上面latch为5的时候需要调用5次countDown方法才能去掉门闩上所有的锁,让门闩打开。

我们来写个例子,来感受一下:

/**
 * @author xjx
 * @description
 * @date 2023/4/7 6:28
 */
public class CountDownLatchTest {
    public static final CountDownLatch latch = new CountDownLatch(2);

    // 等待线程,等待门闩打开
    public static class WaitLatch extends Thread {
        @Override
        public void run() {
            try {
                // 等待门闩打开
                System.out.println(Thread.currentThread().getName() + "被门闩卡住了");
                latch.await();
                // 门闩打开的时候打印一下信息
                System.out.println("门闩打开啦," + Thread.currentThread().getName() + "通过啦");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    // 调用countDown减少门闩锁的线程
    public static class DownThread extends Thread {
        @Override
        public void run() {
            try {
                // 等3秒再去减少,让上面的WaitLatch线程先等着
                Thread.sleep(3000);
                // 减少门闩锁
                latch.countDown();
                System.out.println("释放门闩锁");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 两个等待latch打开的线程
        WaitLatch wait1 = new WaitLatch();
        WaitLatch wait2 = new WaitLatch();
        // 两个去减少latch的线程
        DownThread down1 = new DownThread();
        DownThread down2 = new DownThread();

        wait1.start();
        wait2.start();
        down1.start();
        down2.start();

        // 等待wait1、2,down1、down2线程运行结束之后,main线程再继续执行
        wait1.join();
        wait2.join();
        down1.join();
        down2.join();

        System.out.println("运行结束");
    }
}

针对上面的代码流程我们画个图出来:

最开始的时候wait1、wait2线程调用门闩的await方法,这个时候由于门闩上面还有2道锁,所以wait1、wait2被门闩卡住了,进入等待队列,阻塞等待门闩打开。
然后down1、down2线程分别调用countDown方法各自去掉门闩的一道锁,同时检查如果门闩上没锁了,则唤醒之前被门闩卡住的线程,让他们继续运行。
那么接下来我们就来看看CountDownLatch的底层源码。

2  CountDownLatch底层源码分析

CountDownLatch有一个内部类Sync,继承自AQS,重写了AQS的tryAcquireShared、tryReleaseShared方法,是一个共享锁。而CountDownLatch只是基于内部的Sync做了一层薄薄的封装而已。

接下来我们一个个看Sync实现AQS的tryAcquireShared、tryReleaseShared以及CountDownLatch基于Sync之上做的封装。

2.1  CountDownLatch构造方法

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

上面的构造方法,内部其实就是创建一个Sync同步器,同时指定Sync内部的资源state == count,这里的state其实就是门闩上锁的数量。

2.2  await方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

内部直接调用AQS的acquireSharedInterruptibly方法,我们继续看:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // AQS这里调用子类的tryAcquireShared方法
    // 如果返回结果大于等于0,继续执行业务代码
    // 如果返回结果小于0,则调用doAcquireSharedInterruptibly进入AQS等待队列阻塞等待,那么根据理解我们创建一个们门闩锁出来,await后应该会给我们一个负数,这样才能阻塞等待
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
看到这里,其实就清晰了,进入到AQS的acquireSharedInterruptibly方法获取一个锁,之前我们讲过AQS的acquireShared方法,跟这里是一样的,是一个模板方法。
首先第一个调用的就是子类Sync的tryAcquireShared方法去获取资源。如果获取成功就返回了,继续执行业务代码。
如果获取失败了就调用doAcquireShardInterruptibly方法进入AQS等待队列进行等待,这里的doAcquireSharedInterruptibly的内部逻辑跟我们之前分析的doAcquireShared是一样的。
那么说到这里,就只剩下Sync的tryAcquireShared方法逻辑了,我们下边看。

2.2.1  Sync的tryAcquireShared方法

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

这里非常的简单,就是state == 0的时候返回1,其它情况均返回-1。

那这么看的话,调用CountDownLatch的await方法是不是会被阻塞其实还是看Sync的tryAcquireShared方法是否返回1,也就是state 是否等于0。state == 0就代表门闩上的锁都去掉了,所以门闩就打开了。

2.2.2  await小结

针对上面的await方法内部的逻辑,我们画个图总结一下:

大致流程就是这样:

调用CountDownLatch的await方法其实进入的是AQS的内部acquireSharedInterruptibly方法,这个是AQS内部定义的模板流程方法。
首先就是调用子类Sync的tryAcquireShared方法,也就是实际判断门闩是否打开,当state  == 0 表示门闩上没锁了,打开。
当state != 0 表示门闩上还有锁,这个时候就需要进入AQS的等待队列进行等待咯,等待门闩打开后将线程唤醒。

2.3  countDown方法

public void countDown() {
    sync.releaseShared(1);
}

这里的countDown方法直接就是调用AQS的releaseShared(1)方法,继续进入releaseShared方法:

public final boolean releaseShared(int arg) {
    // 调用子类的tryReleaseShared方法,释放锁
    if (tryReleaseShared(arg)) {
        // 如果锁完全释放了,就唤醒等待队列中沉睡的线程
        doReleaseShared();
        return true;
    }
    return false;
}

这里就是进入AQS释放共享锁的模板流程了:首先就是调用子类的tryReleaseShared方法释放锁,如果完全释放了,也就是state == 0 的时候,就调用AQS的doReleaseShared方法唤醒等待队列中等待的线程。

2.3.1  Sync的tryReleaseShared

我们看看子类Sync的tryReleaseShared的逻辑:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        // 如果state == 0,也就是锁的数量等于0,表示门闩打开了
        if (c == 0)
            return false;
         // 这里就是将state - 1,也就是将门闩上锁的数量减少一道
        int nextc = c-1;
        // CAS操作重新设置锁的数量
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

上面就是将state的值减少1而已,表示将锁的数量减少一道。countDown内部的核心逻辑其实就是将state的数量减少1,也就是锁的数量减少1,当state == 0的时候,表示门闩已经打开,就可以调用doReleaseShared方法将AQS等待队列的线程唤醒了,表示:门闩打开了。

3  await、countDown方法汇总

老规矩,我们结合下 await、countDown方法汇总下流程,方便更好的理解:

4  小结

到这里,关于CountDownLatch的原理分析就结束了,其实感觉就是一个计数开关一样,当门闩上锁为0的时候,开关打开,其它时候关闭等待,整合了AQS,利用了AQS的等待队列阻塞等待,以及唤醒机制。有理解不对的地方欢迎指正哈。