Java AbstractQueuedSynchronizer

发布时间 2023-10-16 20:12:50作者: LARRY1024

前言

Java中的大部分同步类,如 Lock、Semaphore、ReentrantLock 等,都是基于 AbstractQueuedSynchronizer(简称为AQS)实现的。

AQS 是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。AQS 的框架如下图所示:

image

总的来说,AQS 框架共分为五层,自上而下由浅入深,从 AQS 对外暴露的 API 到底层基础数据。

CLH 锁

CLH 锁是由 Craig、Landin 和 Hagersten 的发明,因此命名为 CLH 锁。CLH 是单向链表,AQS 中的队列是 CLH 变体的虚拟双向队列(FIFO),AQS 是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

CLH 锁是对自旋锁的一种改进,是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。

AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个节点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。

CLH 队列结构如下图所示:

image

AQS 框架

AQS 核心思想

AQS 核心思想是:

  • 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。

  • 如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制。

    这个机制 AQS 是基于 CLH 锁实现的。

AQS 的核心原理图:

image

AQS 的同步状态

AbstractQueuedSynchronizer 使用了一个 int 成员变量 state 表示同步状态,通过内置的 FIFO 线程等待/等待队列 来完成获取资源线程的排队工作。并且,状态的获取和修改都是通过 final 修饰的,在子类中无法被重写。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    private volatile int state;  // The synchronization state.
    // 返回同步状态的当前值
    protected final int getState() {
        return state;
    }
    // 设置同步状态的值
    protected final void setState(int newState) {
    state = newState;
    }

    // 原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    ...
}

我们可以通过修改 state 字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程):

image

对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是 AQS 架构图中的第一层:API层。

AQS 对资源的共享方式

AQS 中线程的两种锁的模式:

  • 独占(Exclusive):只有一个线程能执行,如,ReentrantLock。

    独占方式,又分为公平锁和非公平锁:

    • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁;

    • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的。

  • 共享(Shared):多个线程可同时执行,如,Semaphore、CountDownLatch。

ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时对某一资源进行读。

AQS 的重要方法

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。

对于自定义同步器,需要实现以下几种方法:

方法名 描述
protected boolean isHeldExclusively() 该线程是否正在独占资源。只有用到 Condition 才需要去实现它。
protected boolean tryAcquire(int arg) 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。
protected boolean tryRelease(int arg) 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。
protected int tryAcquireShared(int arg) 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int arg) 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待节点返回True,否则返回False。

一般来说,自定义同步器要么是独占方法,要么是共享方式,它们也只需实现:tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock。

AQS 的数据结构

Node

先来看下AQS中最基本的数据结构:Node,Node即为上面CLH变体队列中的节点。

// java.util.concurrent.locks.AbstractQueuedSynchronizer.Node@JDK 1.8
abstract static class Node {
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    // 当前节点在队列中的状态
    volatile int waitStatus;
    // 前驱指针
    volatile Node prev;
    // 后继指针
    volatile Node next;
    // 表示处于该节点的线程
    volatile Thread thread;
    // 指向下一个处于CONDITION状态的节点
    Node nextWaiter;
    ...
}

其中,节点的状态 waitStatus 有下面几个枚举值:

枚举变量 枚举值 含义
- 0 一个新节点入队的默认状态,表示当前节点在 sync queue 中,等待着获取锁。
CANCELLED 1 表示当前节点已取消调度。当 timeout 或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的节点将不会再变化。
CONDITION -2 表示节点等待在 Condition 上,当其他线程调用了 Condition 的 signal() 方法后,CONDITION 状态的节点将从等待队列转移到同步队列中,等待获取同步锁。
PROPAGATE -3 共享模式下,前继节点不仅会唤醒其后继节点,同时也可能会唤醒后继的后继节点。
SIGNAL -1 表示后继节点在等待当前节点唤醒。后继节点入队时,会将前继节点的状态更新为 SIGNAL。

注意,负值表示节点处于有效等待状态,而正值表示节点已被取消,因此,源码中很多地方用 ws > 0ws < 0 来判断节点的状态是否正常。

ConditionObject

Condition

// java.util.concurrent.locks.Condition
public interface Condition {
    // 等待,当前线程在接到信号或被中断之前一直处于等待状态
    void await() throws InterruptedException;
    // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
    void awaitUninterruptibly();
    //等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
    boolean awaitUntil(Date deadline) throws InterruptedException;
    // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
    void signal();
    // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
    void signalAll();
}

ConditionObject

ConditionObject 实现了 Condition 接口,Condition 接口定义了条件操作规范

// java/util/concurrent/locks/AbstractQueuedSynchronizer.java
class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    public class ConditionObject implements Condition, java.io.Serializable {
        // condition 队列的头节点
        private transient Node firstWaiter;
        // condition 队列的尾节点
        private transient Node lastWaiter;
        ...
    }
    ...
}

AQS 源码分析

AbstractQueuedSynchronizer 的源码如下所示:

// java/util/concurrent/locks/AbstractQueuedSynchronizer.java
class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    // 头节点
    private transient volatile Node head;
    // 尾节点
    private transient volatile Node tail;
    // 状态
    private volatile int state;
    // 自旋时间
    static final long spinForTimeoutThreshold = 1000L;

    // Unsafe类实例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // state内存偏移地址
    private static final long stateOffset;
    // head内存偏移地址
    private static final long headOffset;
    // state内存偏移地址
    private static final long tailOffset;
    // tail内存偏移地址
    private static final long waitStatusOffset;
    // next内存偏移地址
    private static final long nextOffset;
    // 静态初始化块
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    ...
}

核心方法

acquire 方法

acquire() 方法的源码如下:

// java/util/concurrent/locks/AbstractQueuedSynchronizer.java
class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
}

acquire() 方法以独占模式获取(资源),忽略中断,即线程在 aquire 过程中,中断此线程是无效的。

当一个线程调用 acquire 时,调用方法流程如下:

  • 首先调用 tryAcquire 方法,调用此方法的线程会试图在独占模式下获取对象状态。

    此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。在 AbstractQueuedSynchronizer 源码中默认会抛出一个异常,即需要子类去重写此方法完成自己的逻辑。

  • 如果 tryAcquire 失败,则

    • 调用 addWaiter 方法,addWaiter 方法完成的功能是将调用此方法的线程封装成为一个节点并放入 Sync queue。

    • 调用 acquireQueued 方法,此方法完成的功能是 Sync queue 中的节点不断尝试获取资源,若成功则返回 true,否则,返回 false。

image

addWaiter

addWaiter 将线程封装为节点的源码如下:

class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    private Node addWaiter(Node mode) {
        // 新生成一个节点,默认为独占模式
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail; // 保存尾节点
        if (pred != null) { // 尾节点不为空,即已经被初始化
            node.prev = pred; // 将 node 节点的prev域连接到尾节点
            // 比较 pred 是否为尾节点,是则将尾节点设置为node
            if (compareAndSetTail(pred, node)) {
                pred.next = node;  // 设置尾节点的 next 域为 node
                return node; // 返回新生成的节点
            }
        }
        // 尾节点为空(即还没有被初始化过),或者是 compareAndSetTail 操作失败,则入队列
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        for (;;) { // 无限循环,确保节点能够成功入队列
            Node t = tail;  // 保存尾节点
            if (t == null) {  // 尾节点为空,即还没被初始化
                if (compareAndSetHead(new Node()))  // 头节点为空,并设置头节点为新生成的节点
                    tail = head; // 头节点与尾节点都指向同一个新生节点
            } else { // 尾节点不为空,即已经被初始化过
                node.prev = t; // 将 node 节点的 prev 域连接到尾节点
                if (compareAndSetTail(t, node)) { // 比较节点 t 是否为尾节点,若是则将尾节点设置为 node
                    t.next = node; // 设置尾节点的 next 域为 node
                    return t; // 返回尾节点
                }
            }
        }
    }
    ...
}

addWaiter 方法使用快速添加的方式往 sync queue 尾部添加节点,如果 sync queue 队列还没有初始化,则会使用 enq 插入队列中,enq 方法会使用无限循环来确保节点的成功插入。

acquireQueue

如果线程获取资源(锁)失败,说明线程已经被添加到等待队列尾部了。acquireQueued() 方法可以对排队中的线程进行“获锁”操作。

总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。

acquireQueue 方法的源码如下:

class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true; // 标记是否成功拿到资源
        try {
            boolean interrupted = false; // 标记等待过程中是否中断过
            for (;;) { // 开始自旋,要么获取锁,要么中断
                final Node p = node.predecessor(); // 获取当前节点的前驱节点
                if (p == head && tryAcquire(arg)) { // 如果 p 是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(注意,头结点是虚节点)
                    setHead(node); // 获取锁成功,头指针移动到当前node
                    p.next = null; // help GC
                    failed = false; // 设置标志
                    return interrupted;
                }
                // 说明 p 为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是 p 不为头结点,这个时候就要判断当前 node 是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    // 靠前驱节点判断当前线程是否应该被阻塞
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;   // 获取头结点的节点状态
        if (ws == Node.SIGNAL) // 说明头结点处于唤醒状态
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true; // 可以进行 park 操作
        if (ws > 0) { // 表示状态为:取消状态
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do { // 循环向前查找取消节点,把取消节点从队列中剔除
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else { // 设置前任节点等待状态为SIGNAL
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    // 主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
    private final boolean parkAndCheckInterrupt() {
        // 在许可可用之前禁用当前线程,并且设置了 blocker
        LockSupport.park(this);
        return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位
    }
    // 取消继续获取(资源)
    private void doAcquireInterruptibly(long arg) throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    // 释放后继节点
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus; // 获取node节点的等待状态
        if (ws < 0) // 状态值小于0,为SIGNAL = -1 或 CONDITION = -2 或 PROPAGATE -3
            compareAndSetWaitStatus(node, ws, 0); // 比较并且设置节点等待状态,设置为0

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next; // 获取node节点的下一个节点
        if (s == null || s.waitStatus > 0) { // 下一个节点为空或者下一个节点的等待状态大于0,即为 CANCELLED
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev) // 从尾节点开始从后往前开始遍历
                if (t.waitStatus <= 0) // 找到等待状态小于等于 0 的节点,找到最前的状态小于等于 0 的节点
                    s = t; // 保存节点
        }
        if (s != null) // 该节点不为为空,释放许可
            LockSupport.unpark(s.thread);
    }
    ...
}

acquireQueued() 方法的流程图如下:

image

其中,shouldParkAfterFailedAcquire() 方法的流程图如下:

image

调用 acquireQueue 方法,可以使 sync queue 中的节点在独占且忽略中断的模式下获取(资源)。

只有当该节点的前驱节点的状态为 SIGNAL 时,才可以对该节点所封装的线程进行 park 操作。否则,将不能进行 park 操作。parkAndCheckInterrupt 方法里的逻辑是首先执行 park 操作,即禁用当前线程,然后返回该线程是否已经被中断。

release

release() 方法以独占模式释放对象,其源码如下:

class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    public final boolean release(long arg) {
        if (tryRelease(arg)) {  // 释放成功
            Node h = head;  // 保存头节点
            if (h != null && h.waitStatus != 0) // 头节点不为空并且头节点状态不为0
                unparkSuccessor(h); // 释放头节点的后继节点
            return true;
        }
        return false;
    }
    ...
}

其中,tryRelease 的默认实现是抛出异常,需要具体的子类实现,如果tryRelease成功时,如果头节点不为空并且头节点的状态不为 0,则调用 unparkSuccessor 方法释放头节点的后继节点。

AbstractQueuedSynchronizer总结

对于 AbstractQueuedSynchronizer 的分析,最核心的就是 sync queue 的分析。

  • 每一个节点都是由前一个节点唤醒;

  • 当节点发现前驱节点是 head 并且尝试获取成功,则会轮到该线程运行;

  • condition queue中的节点向sync queue中转移是通过signal操作完成的;

  • 当节点的状态为SIGNAL时,表示后面的节点需要运行。


参考: