aqs-shared

发布时间 2023-10-28 09:01:11作者: mingshan

上篇文章AQS源码分析-独占模式分析了AQS的结构以及独占模式下资源的获取与释放流程,啰嗦了AQS的基本结构和独占模式。这篇文章主要是探讨下AQS在共享模式下资源的获取与释放,同时比较下两种模式的差异(本文基于JDK11版本)。

流程分析 - 获取资源

这篇文章以CountDownLatch为例,和独占模式一样,AQS同样提供了资源的获取与释放的方法供子类进行重写,如下所示:

protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)

CountDownLatch的官方描述如下:

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

简单来说,CountDownLatch是一个同步器,用来使一个或者多个线程等待其他线程完成各自的工作后再执行。是不是像一个计数器?本来就是一个计数器,只不过支持并发操作。

注意CountDownLatch提供构造函数用来初始化需要等待的线程数量,其实就是设置AQS中state的具体值,如下:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync(int count) {
    setState(count);
}

AQS#acquireSharedInterruptibly

CountDownLatch提供了await方法用来阻塞当前线程直到CountDownLatch内部计数为0。所以我们先从await方法开始:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly(1)方法是AQS类中的,源码如下:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

从acquireSharedInterruptibly方法的实现中可以看出,该方法响应中断,如果当前线程被中断,抛出中断异常。然后调用tryAcquireShared(arg)方法,注意该方法的返回值是一个整数,这个限定了返回值的语义,如果小于0,证明获取共享资源失败,需要进入阻塞队列。我们来看看CountDownLatch提供的tryAcquireShared实现:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

从上面的方法中可以看出,调用了getState方法获取state的值,如果值为0,返回1,代表获取共享资源成功;否则返回-1,代表获取共享资源失败。

总结来说,子类在tryAcquireShared的实现上需要注意以下几点:

  1. 检查当前是否支持在共享模式下获取资源
  2. 返回值语义
    1. 如果返回值小于0,说明获取资源失败,需要进入阻塞队列
    2. 返回值等于0,说明获取资源成功,但后续的节点携带的线程无法唤醒,无法继续获取资源
    3. 返回值大于0,说明获取资源成功,但会唤醒后续节点然后尝试获取资源

AQS#doAcquireSharedInterruptibly

我们回到AQS的acquireSharedInterruptibly方法,当tryAcquireShared返回值小于0,就会执行doAcquireSharedInterruptibly(1)方法,代码如下:

/**
 * Acquires in shared interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

在doAcquireSharedInterruptibly方法中,首先会调用addWaiter(Node.SHARED)方法,该方法在AQS源码分析-独占模式#AQS#addWaiter方法已经说过,主要作用是初始化CLH队列或将新创建的节点(携带当前线程)成功入队至队尾,最后返回该新建的节点。只不过此时的参数是Node.SHARED,代表共享模式。

接着就是一个无限for循环(这个非常常见),在循环体内,首先获取当前节点的前驱节点p,如果p为头结点,说明当前节点为阻塞队列第一个持有线程的节点,那么就调用tryAcquireShared(1)直接尝试获取资源,因为前面已经没有持有线程的节点了。如果返回值大于等于0,说明此时state的值已为0,获取资源成功了,接下来调用setHeadAndPropagate(node, r)方法,代码如下:

AQS#setHeadAndPropagate

/**
 * Sets head of queue, and checks if successor may be waiting
 * in shared mode, if so propagating if either propagate > 0 or
 * PROPAGATE status was set.
 *
 * @param node the node
 * @param propagate the return value from a tryAcquireShared
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

setHeadAndPropagate方法有两个参数,第一个参数node为已经成功获取资源的节点,第二个参数propagate为tryAcquireShared方法的返回值,注意此时进入该方法时propagate的值大于或者等于0。

首先会记录原来CLH队列旧的头结点,然后调用setHead(node)方法设置头结点,这个方法如下:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

接下来有一个if判断,貌似很长的样子,我们来具体分析一下:

首先propagate > 0代表当前线程已经获取到了资源,并且需要唤醒后面阻塞的节点;h.waitStatus < 0 代表旧的头节点后面的节点可以被唤醒;(h = head) == null || h.waitStatus < 0 这个操作是说新的头节点后面的节点可以被唤醒,总结来说:

  1. propagate > 0代表当前线程已经获取到了资源,并且需要唤醒后面阻塞的节点
  2. 无论新旧头节点,只要其waitStatus < 0,那么其后面的节点可以被唤醒

如果上面if返回true,接着获取当前节点的后继节点,这里又会有一个判断,如果后继节点是共享模式或者现在还看不到后继的状态,则都继续唤醒后继节点中的线程。上面if返回true,接着执行doReleaseShared方法,代码如下:

AQS#doReleaseShared

/**
 * Release action for shared mode -- signals successor and ensures
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 */
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        // 获取CLH队列头节点
        Node h = head;
        // 判断头节点是否为null,h == tail 代表CLH刚进行初始化,CLH队列中并未有携带线程的节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 判断头节点的waitStatus是否为SIGNAL,
            // 如果为SIGNAL,该值必为其后继节点设置的,说明后继节点等待被唤醒
            if (ws == Node.SIGNAL) {
                // CAS 将头节点的waitStatus由SIGNAL设置为0, 相当于重置
                // 这个进行CAS主要是考虑release时也会调用该方法,需要并发控制
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒后继节点
                unparkSuccessor(h);
            } // 如果后继节点还未设置前驱节点的waitStatus为SIGNAL,代表目前无需唤醒或者不存在。
              // 那么就将头节点的waitStatus设置为PROPAGATE,代表在下次acquireShared时无条件地传播
            else if (ws == 0 &&
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果头节点未发生变化,则代表当前尙未有其他线程获取到资源,直接退出。
        // 如果头节点已经发生变化,代表已经有线程获取到资源,
        // 那么就需要重新进入到for循环中,将唤醒传递下去
        if (h == head)                   // loop if head changed
            break;
    }
}

doReleaseShared是一个相当重要的方法,注意该方法在获取资源时会被调用,在释放资源时也会被调用。首先内部有一个无限for循环,在循环体内,会获取当前CLH队列的头节点,判断头节点是否为null,并且判断头节点和尾节点是否相等,相等代表CLH刚进行初始化,CLH队列中并未有携带线程的节点。

然后获取头节点的waitStatus,接着判断waitStatus是否为SIGNAL

如果为SIGNAL,该值必为其后继节点设置的,说明后继节点等待被唤醒,那么就利用CAS先将头节点的waitStatus设置为0,设置失败,就继续for循环;设置成功的话,就调用unparkSuccessor唤醒其后继节点,该方法已经在AQS源码分析-独占模式分析过,这里略过。如果后继节点成功获取资源,会造成head的改变,

如果不为SIGNAL,接着会进行判断当前头节点的waitStatus是否为0,如果不为0,进行执行for循环;如果为0,代表已经当前头节点的后继节点已经被唤醒。

我们回到doAcquireSharedInterruptibly方法,当setHeadAndPropagate方法执行完时,将p(头节点)的next置空,然后返回。如果p不为头节点,接下来就会将执行下面这段代码:

if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    throw new InterruptedException();

shouldParkAfterFailedAcquire方法在AQS源码分析-独占模式已经说过了,该方法的作用是在当前线程获取资源失败后是否挂起当前线程,返回true则代表可以挂起,会执行parkAndCheckInterrupt方法,源码如下:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

parkAndCheckInterrupt方法将当前线程挂起,并返回当前线程是否中断。

流程分析 - 释放资源

前面说到,在setHeadAndPropagate会调用doReleaseShared方法来唤醒节点,doReleaseShared方法当然在释放资源时也会被调用。对于CountDownLatch的countDown方法来说,入口是AQS的releaseShared方法,arg的值为1,代码如下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

主要tryReleaseShared方法是AQS的子类实现的,在CountDownLatch中,tryReleaseShared的实现如下:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

发现这是一个自旋操作,在无限循环体内,首先获取state的值c,如果c的值为0,证明无资源可释放,返回false;否则计算c - 1的值nextc,然后通过CAS将state的值由c设置为nextc,如果设置成功的话,返回nextc == 0的比较结果,即如果nextc == 0,返回true,否则返回false;如果设置失败,重新进入到for循环,直至符合上面退出for循环的条件退出 tryReleaseShared 方法。总结来说,只有当state能够设置为0时,tryReleaseShared返回true,其他返回false。

回到AQS的releaseShared方法,如果tryReleaseShared返回true,则调用doReleaseShared方法,这个方法的实现在上面的获取资源也调用了,具体实现说明参考上面部分。

纵观releaseShared方法,我们可以得出结论,在CountDownLatch类中,只有当state为0时,才会调用doReleaseShared方法,唤醒后继节点以及传播唤醒动作。当然,这需要考虑tryReleaseShared的子类具体实现,抽象来说,就是只有tryReleaseShared方法返回true时,才会进行唤醒动作。

总结

AQS共享模式获取资源、释放资源流程图如下:

image

References:


title: AQS源码分析-共享模式
categories: [Java, JUC]
tags: [java, JUC, AQS]
author: Mingshan
date: 2019-2-2