【Java 并发】【九】【AQS】【三】基于AQS的共享锁实现、底层源码深度剖析

发布时间 2023-04-06 08:02:02作者: 酷酷-

1  前言

上一节我们详细讲解了基于AQS实现的互斥锁机制,进行了深入的剖析,包括从acquire入口源码开始,剖析了获取锁失败调用addWaiter方法加入等待队列,知道了Node节点是怎么插入等待队列的;同时还剖析acquireQueue方法的源码,解析了插入等待队列之后的节点什么时候被挂起,什么时候会再去被唤醒然后再去竞争锁;同时还剖析了释放锁release的源码,释放资源之后将等待队列中的第二节点唤醒,然后竞争锁,那么这节我们来看看共享锁的实现过程又是怎么样的。

2  acquireShared 方法源码解析

AQS提供共享锁获取和共享锁释放的入口方法分别为acquireShared(int arg)、releaseShared(int arg);我们一个个来分析,首先从acquireShared的源码开始。

共享锁入口,acquireShared方法源码:

public final void acquireShared(int arg) {
    // 1. tryAcquireShared < 0 表示获取共享锁失败
    if (tryAcquireShared(arg) < 0)
        // 2. 然后进入doAcquireShared方法
        doAcquireShared(arg);
}

首先acquireShared也是一个模板方法,里面定义了一个模板流程:

(1)首先第一步还是会调用子类的tryAcquireShared方法,尝试去获取共享锁,如果返回结果大于等于0,则获取共享锁成功,返回结果小于0,获取共享锁失败。
(2)获取失败会进入doAcquireShared方法,这里就是获取共享锁失败之后,进入等待队列阻塞等待,或者再次尝试获取锁的细节了。doAcquireShared方法的源码我们后面再分析

这里的tryAcquireShared方法的返回值,表示如果获取了arg个资源之后当前还剩下多少个共享资源:

  • 当 r > 0 的时候,表示获取了arg个资源之后,还有资源剩余,剩余资源大于0个,说明资源充足,获取锁成功;
  • 当 r == 0的时候,表示获取了arg个资源之后,剩余资源为0个,表示成功获取arg资源之后没有剩余的了,刚刚够你需要的arg个资源,获取锁成功;
  • 当 r < 0的时候,表示资源根本不够,如果你要获取arg个资源之后,剩下就是小于0了,不够,说明给你获取资源失败了,获取锁失败。

我们举个例子:

比如说当前的剩余共享资源就只有5个,也就是此时state = 5,此时线程调用 r = tryAcquireShared(int arg)方法去获取资源,可能有下面的几种结果:

(1)如果arg = 3,则 r = tryAcquireShard(3), r 的结果是 5 - 3 = 2,r > 0 说明还有2个资源剩余,获取锁成功
(2)如果arg = 5,则 r = tryAcquireShard(5),r的结果是 5 - 5 = 0,r = 0说明资源刚刚够你需要的,没有剩余的了,此时获取锁成功
(3)如果arg = 6,则 r = tryAcquireShard(6),r的结果是 5 - 6 = -1,你需要6个资源,这里剩余只有5个,不够你获取,r < 0 说明你获取资源失败了,即获取锁失败

2.1  入队和等待,doAcquireShared方法源码

说完 acquireShared 那我们具体看一下 doAcquireShared 方法,即进入等待队列阻塞等待,或者再次尝试获取锁,具体的代码逻辑:

private void doAcquireShared(int arg) {
    // 将当先线程封装成一个Node节点,节点模式为共享锁模式SHARED
    // 这里的addWaiter方法的源码,上一节将互斥锁的时候讲过了,一模一样的
    final Node node = addWaiter(Node.SHARED);
    // 获取锁成功还是失败的标识
    boolean failed = true;
    try {
        // 是否被中断标识
        boolean interrupted = false;
        for (;;) {
            // 获取节点node的前一个节点p
            final Node p = node.predecessor();
            // 如果p是head,则node自己则是第二节点
            // 上一节我们就说过了哈,AQS规定第二节点是可以去争抢锁的
            if (p == head) {
                // 这里调用子类的tryAcquireShared方法去尝试获取锁
                // r >= 0 表示获取锁成功,r < 0 表示获取锁失败
                int r = tryAcquireShared(arg);
                // 如果 r >= 0,获取锁成功
                if (r >= 0) {
                    // 这个时候可能锁还有剩余,需要在传播下去
                    // 就是把等待队列中的节点,让他们也去获取锁
                    // 嘿,兄弟,起来了,还有锁资源,赶紧去争抢把 这里就看出和互斥锁不一样了,互斥锁只有setHead没有传播,共享锁是需要传播的
                    setHeadAndPropagate(node, r);
                    // 将头节点的next设置为null,也就是移除头节点了
                    p.next = null; // help GC
                    // 如果interrupted为true,表示线程被中断了
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 走到这里,说明上面争抢锁失败了,判断是不是要挂起,源码上一节剖析过了
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 这里的parkAndCheckInterrupted是将线程挂起,源码上一节剖析过了
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 这里是获取锁失败,将自己插入等待队列的节点删除
        // 源码在上一节剖析过了
        if (failed)
            cancelAcquire(node);
    }
}

其实大致逻辑跟上一节互斥锁的思路大体一致都,当你看懂了上一章AQS互斥锁机制之后再来看共享锁的源码,其实就很简单了。

上面的源码步骤,我们来捋一下:

(1)首先获取当前线程node节点的前驱节点,也就是上一个节点p
(2)如果p是head节点,则说明自己是第二节点,可以去争抢锁了(AQS规定了第二节点是等待队列中下一个能获取到锁的节点,这个上一节我们讲过了哦)
(3)然后调用子类的tryAcquireShared方法去争抢锁,r表示争抢共享锁的结果。r >= 0 表示获取锁成功,r < 0表示获取锁失败
(4)如果 r >= 0 表示获取锁成功,由于共享锁可以是多个线程持有,这个时候可能锁资源还有剩余,就需要调用setHeadAndPropagate方法传播下去,告诉下面的节点,资源可能还有,赶紧起来争抢啦,别睡觉了
(5)如果上面获取锁失败,则调用shouldParkAfterFailedAcquire 方法判断自己是否应该被挂起,这里的流程和源码我们上一节分析过了
(6)如果自己应该被挂起,则调用parkAndCheckInterrupted方法将自己挂起,自己就停在这里了
(7)别的线程进行锁传播的时候,将自己唤醒,从(6)挂起的地方继续运行,重复上面的(1)、(2)、(3)、(4)、(5)、(6)步骤。

然后根据上面的源码,我们再画个图来分析一下:

2.2  共享锁传播,setHeadAndPropagate方法源码

其实可以看到和互斥锁唯一不一样的地方就是 setHeadAndPropagate 方法,互斥锁那里只有 setHead,因为互斥锁不需要传播,而共享锁不一样,共享锁某个线程发现剩余还有资源的话是会传播的,我们看下该方法的源码:

private void setHeadAndPropagate(Node node, int propagate) {
    // 获取头节点
    Node h = head;
    // 这里就是将自己设置成head节点
    // 所以后面资源传播的过程,只要资源还充足,head节点是不断变化的
    // 因为这里只要线程获取了资源成功,就把自己设置成头节点
    setHead(node);
    // propagate就是上面我们调用r = tryAcquireShared的返回值
    // 也就是剩余资源的个数,如果propagate > 0 说明资源还剩余,继续传播
    // head == null这里说的是head节点是空,说明头节点已经获取资源完毕
    // 可能head应该释放资源了,所以可能这个时候需要去唤醒别的线程
    // h.waitSatatus < 0 说明head节点后面有线程在等待你唤醒,
    // head节点表示当前已经获取了资源的线程,这个时候你需要把后面的节点唤醒
    // 让head后面节点的去竞争资源
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 如果下一个节点是共享锁模式,就唤醒继续传播
        if (s == null || s.isShared())
            // 这里的doReleaseShared方法其实就是唤醒和传播的具体逻辑了
            doReleaseShared();
    }
}

上面的核心源码的步骤大概如下:

(1)首先第一步获取head头节点,然后将自己node设置成头节点,因为自己已经获取资源成功了。而头节点表示获取了资源成功的节点
(2)然后判断propagate是否大于0,大于0说明还有资源剩余,应该继续传播
(3)判断head == null,head节点已经没了,说明等待队列队列头节点已经释放资源了,可能这时候资源又有空余了
(4)判断head.waitStatus < 0 说明后面还有人等你唤醒,需要唤醒后面的节点让他们竞争资源了

2.3  实际传播逻辑,doReleaseShared方法源码

接着我们来看看doReleaseShared方法的源码:

private void doReleaseShared() {
    for (;;) {
        // 获取头节点
        Node h = head;
        // head != null && head != null说明等待队列还有其他的节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果是SINGAL(-1),说明需要唤醒自己的下一个节点
            // 这里我们上一节讲过了SINGAL就是一个信号,
            // 这个信号表示你需要唤醒你的下一个节点
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // loop to recheck cases
                // 这里就是唤醒h的下一个节点了,源码我们上一节分析过了
                unparkSuccessor(h);
            }
            // 设置已经在传播资源了
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue; // loop on failed CAS
        }
        
        // 资源在传播的过程中,如果有新的节点获取到线程,会把自己设置成head
        // 则只要资源还有足够的剩余,说明head是不断变化的
        // 如果h == head说明头节点不再变化,也就还没有新的节点获取资源了
        // 这个时候就传播结束了,退出循环
        if (h == head) // loop if head changed
            break;
    }
}

上面的源码就是共享资源传播的核心源码,大致过程就是:

(1)首先进来先获取一下head节点,然后如果head节点的等待状态是-1,则唤醒一下head节点的下一个节点,也就是第二节点,让第二节点来竞争锁
(2)如果资源充足的话,第二节点获取到锁之后,将自己设置成head节点,所以只要资源充足这里的head节点是一直会不断的传递下去的(也就是说第三节点、第四、第五节点在获取锁之后也将自己设置成head节点)
(3)然后第二节点head节点之后,继续唤醒自己的下一个节点,也就是会继续调用unparkSuccessor方法唤醒沉睡线程,唤醒第二节点的下一个节点
(4)当h == head的时候,说明head节点不再发生变化了,说明资源已经获取完毕了,没有资源可以传播了,这个时候就不再需要再唤醒线程了,也就是break跳出循环了。

我们举个例子,来加强理解一下:

假如最开始有6个共享资源:

(1)首先head节点成功获取3个资源,此时还剩下3个资源,发现资源有剩余,则唤醒下一个节点,也就是第二节点

(2)第二节点被唤醒之后,发现自己是head的下一个节点,然后去竞争资源,自己此时需要2个资源,获取资源之后,将自己设置成head节点,此时head节点移动,指向第二节点。同时发现还有1个资源剩余,继续唤醒第三节点

 

(3)此时第三节点醒来了,发现自己的head的下一个节点(也就是自己是等待队列的第二个节点),然后去竞争资源,争抢资源成功,然后设置自己是头节点。同时发现剩余资源是0个了,但是此时它还是会唤醒自己的下一个节点,也就是唤醒第四节点,因为下一个能获取到资源的就是第四节点了,需要提前一点醒来,需要唤醒让它去竞争。

(4)第四节点醒来之后,发现自己的head的下一个节点,然后去竞争资源,但是此时居然给我返回-2,说明没资源了,害~,没办法,第四节点只能不断的去尝试了,此时传播到这里就结束了

讲到这里其实AQS中的acquireShared方法内部的全部机制、全部源码都讲解完了,下面我们来看释放。

3  共享锁释放,releaseShared方法源码

我们来看下方法的源码:

public final boolean releaseShared(int arg) {
    // 1. 调用子类的tryReleaseShared方法,实际去释放资源
    if (tryReleaseShared(arg)) {
        // 2. 释放资源之后竟然是调用这个,我们上面刚刚讲过
        // 3. 这里其实本质上就是传播资源,继续唤醒后面的节点来竞争资源
        // 这里刚刚讲解过,理解起来应该是简单了
        doReleaseShared();
        return true;
    }
    return false;
}

这个方法也是AQS定义的一个模板方法,有下面的模板流程:
(1)首先是调用子类的tryReleaseShared方法去释放资源,释放资源成功后进入下一步
(2)如果释放成功,调用doReleaseShared方法去传播资源,这里上面刚刚讲解过,其实就是继续唤醒后面的线程来竞争资源,直到资源不足为止获取全部线程都唤醒为止

也就是它调用子类的tryReleaseShared方法释放资源之后。居然就是调用doReleaseShared方法继续传播了,就是上面的继续唤醒后面的线程来竞争,直到资源不足。

4  acquireShared,releaseShared方法汇总

跟互斥锁一样,我们最后画个图从整体上梳理一下acquireShared获取锁以及releaseShared释放锁的全流程:

5  小结

到这里,AQS获取共享锁和释放共享锁的底层源码、核心流程全部分析完毕了,有理解不对的地方欢迎指正哈。