AQS源码解析

发布时间 2024-01-13 17:43:52作者: Stitches

AQS 结构特性

  • 内部包含 NodeConditionObject 静态内部类,Node 用来存储没竞争到锁的线程状态、CondidtionObject 是对条件变量的封装;
  • volatile int state 变量记录锁的状态,1 表示锁被持有、0 表示锁被释放,同时对应三个方法来更改/获取锁的状态:getState()setState(int newState)compareAndSetState(int expect, int update)
  • AQS 内部维护了一个双向链表实现的线程等待队列,称为 CLH 队列;
  • AQS 支持两种模式的资源访问(独占/共享):独占模式是指同一时间只允许一个线程访问资源,例如 ReentrantLock,共享模式是指可以多线程访问资源 CountDownLatchSemaphore

AQS 中 CLH 队列结构如下:

  • 比如说此时恰好有个线程A 持有资源,持有资源的线程一定位于 Head 节点;
  • 此时另一个线程B 想要获取锁资源,但是获取锁失败,将B线程封装为 Node节点存到队列尾;
  • 线程B 被挂起,并通知线程A(将线程A 的 waitStatus 状态设置为 SIGNAL),在 A释放资源时通知其它线程;
  • 线程A 释放资源,将自身 Node 设置为 null 方便 GC回收,然后通知线程 B;
  • 线程B 尝试获取锁资源(公平锁大概率成功获取,非公平锁不一定)。

此外,AQS 还预留了一些接口给子类,由子类实现锁的释放和获取:

//尝试获取排他锁
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
//尝试释放排他锁
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
//尝试获取共享锁
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
//尝试释放共享锁
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
//判定当前线程获得的资源是否是排他资源
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

1、Node 静态内部常量类

static final class Node {
        /** 节点处于共享模式标记 */
        static final Node SHARED = new Node();
        /** 节点处于独占模式标记 */
        static final Node EXCLUSIVE = null;

        /** 当前节点作废,需要移除队列 */
        static final int CANCELLED =  1;
        /** 待唤醒后继节点,当前节点释放资源后一定会唤醒后继节点,因此后继节点可以安心挂起 */
        static final int SIGNAL    = -1;
        /** 当前节点线程获取了资源,但是执行过程中又主动放弃了资源,被移入 Condition 队列中(注意不是等待队列) */
        static final int CONDITION = -2;
        /**
         * Head 节点才持有,标记着连续唤醒队列中处于共享模式的节点,让他们并发获取共享资源
         */
        static final int PROPAGATE = -3;
        /** 当前节点的等待状态,标记为上述的几个值 */
        volatile int waitStatus;
        /** 前驱节点 */
        volatile Node prev;
        /** 后继节点 */
        volatile Node next;
        /** 当前节点绑定的线程 */
        volatile Thread thread;
        /** 等待条件的下一个节点 */
        Node nextWaiter;
}

Node 结构如上图所示,Node 就是绑定某一个线程,并存储该线程相关信息的结构。

2、ConditionObject 条件变量类


3、核心排他锁加锁方法

这里以 ReentrantLock.lock() 方法切入,我们最终会发现调用的还是 AQS 类的 acquire() 方法:

    // Reentrant.lock()
    public void lock() {
        sync.lock();
    }

    // FairSync.lock()
    final void lock() {
        acquire(1);
    }

    // AQS.acquire()
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire() 方法的逻辑如下:

  • 调用由子类实现的 tryAcquire() 方法获取锁,如果成功就结束,否则失败了返回 false;
  • 如果失败了继续执行以下步骤:
    • 调用 addWaiter(Node) 方法封装当前线程对应的 Node 结构,然后通过 CAS 操作添加到队尾并修改队尾节点;
    • 调用 acquireQueued() 方法,先找到当前节点的前驱节点 PrevNode:
      • 如果 PrevNode 是队列头节点,就再执行一次 tryAcquire() 方法获取锁,因为可能前驱节点已经释放了锁资源,相当于一次重试。如果成功获取锁,就将当前节点设置为头节点并将前驱节点置空帮助 GC 回收;
      • 如果 PrevNode 不是头节点,就执行 shouldParkAfterFailedAcquire() 方法判断是否将当前节点对应的线程挂起,如果需要挂起就调用 LockSupport.park() 方法挂起线程。如何判断是否需要挂起?根据前驱节点的 waitStatus 标志,如果是 SIGNAL 就挂起,否则设置前驱节点为 SIGNAL 并等待下一次自选时挂起。
  • 调用 selfInterrupt() 方法,只有在尝试加锁失败且 acquireQueued() 方法标识为 true 时才执行。

3.1 tryAcquire() 方法

这里以 ReentrantLock.FairSync.tryAcquire() 来分析:

// ReentranLock.Sync 内部类继承自 AQS
abstract static class Sync extends AbstractQueuedSynchronizer {.....}

// FairSync 继承自 Sync
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    // 加锁操作
    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 获取锁状态:0——未加锁;1——已有线程加锁
        int c = getState();
        if (c == 0) {
            // hasQueuedPredecessors():判断等待队列中是否有线程在排队,已有返回 true,否则 false
            // 如果没有线程排队就尝试 CAS 加锁
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                // 加锁成功,设置为当前独占线程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 否则已经有线程获取了锁,判断是不是当前线程自身,如果是就是重入加锁,累加 state 变量
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

3.2 addWaiter() 方法

private Node addWaiter(Node mode) {
    // 构造当前线程对应的节点
    Node node = new Node(Thread.currentThread(), mode);
    // 判断队列尾是否为空,不为空 CAS 插入
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 否则队列未初始化,执行队列初始化然后插入节点
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter() 方法最终返回成功加入队列尾的节点。

3.3 acquireQueued() 方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 注意这里会一直自旋
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点为头节点,当前线程尝试重新获取锁
            if (p == head && tryAcquire(arg)) {
                // 成功后 CAS 设置头节点,并协助 GC
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 否则挂起当前线程,等待下一次自旋
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
            * This node has already set status asking a release
            * to signal it, so it can safely park.
            */
        // 前驱节点状态为 SIGNAL,安心挂起
        return true;
    if (ws > 0) {
        /*
            * Predecessor was cancelled. Skip over predecessors and
            * indicate retry.
            */
        // 前驱节点状态为 CANCELED,循环删除直到找到找到节点状态正常的前驱节点为止
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
            * waitStatus must be 0 or PROPAGATE.  Indicate that we
            * need a signal, but don't park yet.  Caller will need to
            * retry to make sure it cannot acquire before parking.
            */
        // 否则设置前驱节点状态为 SIGNAL,那么下一次循环就会挂起当前线程
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


private final boolean parkAndCheckInterrupt() {
    // 挂起当前线程
    LockSupport.park(this);

    // Thread.interrupt() 检测当前线程是否可以被中断(检查中断标志),并返回一个 boolean 并清除中断状态,第二次调用时中断状态已被清除,将返回一个 false
    // interrupt() 并不中断线程,结果可能为 true/false
    return Thread.interrupted();
}

3.4 加排他锁方法总结

总结下 AQS 添加排他锁的逻辑:

  • 首先判断 CLH 队列中是否有等待资源的线程,如果没有直接 CAS 加锁,具体来说就是将当前节点设置为头节点;否则判断持有锁的线程和当前线程是不是为同一个线程,若为同一个,即重入操作,那么修改锁信息(+1)即可;
  • 反之当前线程可能需要等待,这取决于它的前驱节点的状态:
    • 若前驱节点为头节点,那么再重复尝试一次,这样能保证资源最大限度地利用。如果成功获取锁,就将头节点置空方便垃圾回收,并且当前线程设置为头节点;
    • 否则当前线程需要插入到队尾等待,但是此时线程是可能分配到CPU时间片的,所以我们还需要释放线程占有的系统资源,但是释放资源有个前提,就是前驱节点释放锁时能够唤醒我。所以自选检查前驱节点状态,如果不是 SIGNAL 就需要更新为 SIGNALSIGNAL 保证了前驱节点释放锁时一定会唤醒后继节点。这里检查的同时还清理了队列中需要丢弃的节点(CANCELLED)。

4、核心排他锁解锁方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

释放锁会首先尝试释放锁,如果释放成功,就唤醒后继第一个状态正常(非CANCELLED)的线程。

4.1 tryRelease() 方法

参考 ReentrantLock.Sync.tryRelease()

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 如果当前释放锁的线程不是独占锁的线程,抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        // 设置独占锁线程为空
        setExclusiveOwnerThread(null);
    }
    // 更新锁状态
    setState(c);
    return free;
}

4.2 unparkSuccessor() 方法

唤醒后继节点的方法:

private void unparkSuccessor(Node node) {
    /*
        * If status is negative (i.e., possibly needing signal) try
        * to clear in anticipation of signalling.  It is OK if this
        * fails or if status is changed by waiting thread.
        */
    // 将当前头节点的状态更新为 0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
        * Thread to unpark is held in successor, which is normally
        * just the next node.  But if cancelled or apparently null,
        * traverse backwards from tail to find the actual
        * non-cancelled successor.
        */
    // 获取后继节点,如果后继节点为空或者状态为 CANCELLED,反而从队列尾反向查找第一个状态正常的节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
    // 找到后唤醒线程,线程会进入竞争状态
        LockSupport.unpark(s.thread);
}

线程在被成功唤醒后会进入资源竞争状态,此时就对应了前面加锁时的 acquireQueued() 方法,方法中包含一个死循环,循环会一直重复调用 tryAcquire() 方法,这里举个例子方便理解:

  • 假设在 t1 时刻,线程A 获取了锁资源,线程B 也尝试获取锁,但是被线程A 占用,所以线程B被搞到了等待队列中(此时线程B 的前驱节点就是头节点也即线程A),线程B 会在acquireQueued的for(;;)中 不断自旋!
  • 如果 t2 时刻线程A 释放了锁资源,那么 unparkSuccessor() 方法会唤醒线程B 节点;
  • 接着在 t3 时刻,线程B 自旋到 if(p==head && tryAcquire(arg)) 方法时,将线程B 设置为头节点,此时 B持有锁资源;
  • 问题是如果B 的前驱节点不是头节点,参考 shouldParkAfterFailedAcquire() 方法,程序会循环向前遍历,将所有状态为 CANCELLED 的前驱节点剔除掉,这样队列中的所有节点都有很大概率是有效状态的节点。

4.3 释放锁总结

线程释放锁资源用到了一个特别的策略,在寻找第一个有效后继节点时(状态非 CANCELLED),如果第一个节点无效(null 或者 CANCELLED),就转而从队列尾开始向前查找。

相应的,队列中等待获取锁的线程会执行自旋操作,自旋操作过程中会不断清理队列中状态为 CANCELLED 的线程节点。

5、核心共享锁加锁方法

Semaphore 为例讲解 AQS 共享锁加锁过程:

// Semaphore.acquire() 加锁
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// Sync 继承 AQS类,调用 AQS类方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 若没有多余的剩余资源
    if (tryAcquireShared(arg) < 0)
        // 进入等待队列排队
        doAcquireSharedInterruptibly(arg);
}

// tryAcquireShared() 方法由 Semaphore 实现
protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

5.1 tryAcquireShared() 函数

加锁后返回剩余可用的锁资源数:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    // 共享锁加锁,返回加锁完毕后剩余的可用资源数
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}


final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 计算剩余资源是否可用
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

5.2 doAcquireSharedInterruptibly() 函数

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 当前线程节点添加到队列尾
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 如果前驱节点是 Head,重新尝试加锁
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 当前节点成功获取锁,向下传播,判断后续节点能否获取锁资源
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 否则调整前驱节点,挂起当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

5.3 setHeadAndPropagate() 函数

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
        * Try to signal next queued node if:
        *   Propagation was indicated by caller,
        *     or was recorded (as h.waitStatus either before
        *     or after setHead) by a previous operation
        *     (note: this uses sign-check of waitStatus because
        *      PROPAGATE status may transition to SIGNAL.)
        * and
        *   The next node is waiting in shared mode,
        *     or we don't know, because it appears null
        *
        * The conservatism in both of these checks may cause
        * unnecessary wake-ups, but only when there are multiple
        * racing acquires/releases, so most need signals now or soon
        * anyway.
        */
    // 如果还有剩余锁资源或者头节点为空或头节点状态无效,尝试唤醒共享模式的后继节点
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}


private void doReleaseShared() {
    /*
        * Ensure that a release propagates, even if there are other
        * in-progress acquires/releases.  This proceeds in the usual
        * way of trying to unparkSuccessor of head if it needs
        * signal. But if it does not, status is set to PROPAGATE to
        * ensure that upon release, propagation continues.
        * Additionally, we must loop in case a new node is added
        * while we are doing this. Also, unlike other uses of
        * unparkSuccessor, we need to know if CAS to reset status
        * fails, if so rechecking.
        */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 头节点状态为 SIGNAL,说明头节点释放后一定会唤醒后继节点
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 只要头节点CAS成功改变状态为0,就一定会唤醒后继节点
                unparkSuccessor(h);
            }
            // 若头节点是初始状态,就更改状态为 PROPAGATE,以便后续传播
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

什么情况下节点的 waitStatus=0

  • 节点进入队列后,如果要挂起一定会先将前驱节点状态调整为 SIGNAL,若此时前驱节点还未调整完,后继节点的状态为 0;
  • 在上面 doReleaseShared() 中,如果后继节点被唤醒,前驱节点需要先通过 CAS 将状态转换为 0;

唤醒逻辑梳理下如下:

  • 判断头节点的状态,如果是 SIGNAL 说明需要唤醒后继节点;
  • 如果头节点状态为 0,则告知头节点,在释放后不仅需要唤醒后继节点,还需要不断传播下去唤醒更多共享模式的节点。