从ReentrantLock 看AQS源码

发布时间 2023-04-03 22:54:30作者: loveletters

ReentrantLock简介

ReentrantLock意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁

ReentrantLock与Synchronized的区别

ReentrantLock支持公平锁和非公平锁,ReentrantLock内部有一个抽象内部类Sync 集成于 AQS, 并且ReentrantLock就是通过Sync的具体实现(FairSync,NonfairSync)来实现加解锁。

公平锁加锁过程

 public final void acquire(int arg) {
        // 条件一:!tryAcquire 尝试获取锁,获取成功返回true,获取失败返回false
        // 条件二:2.1 首先addWaiter将当前线程封装成node入队
        //。      2.2:acquireQueued 挂起当前线程  唤醒后相关的逻辑
        //        acquireQueued 返回true 表示挂起过程中被中断唤醒线程被中断唤醒过。。。false表示未被中断过
        
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }


        protected final boolean tryAcquire(int acquires) {
            // current 当前线程
            final Thread current = Thread.currentThread();
            // AQS state值
            int c = getState();
            // 条件成立:c==0表示当前的AQS处于无锁状态。。。
            //因为fairSync是公平锁 任何时候都需要检查一下队列中是否在当前线程之前有等待者
            //hasQueuedPredecessors()方法返回true表示当前线程前面有等待者,当前线程需要入队等待
            // hasQueuedPredecessors()方法返回false 表示当前线程前面没有等待者 尝试获取锁。。
            // 条件二:compareAndSetState(0, acquires)
            // 成功:说明当前线程抢占锁成功
            //失败: 说明存在竞争,且当前线程竞争失败。。。
            if (c == 0) {
                //
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    // 成功后需要设置当前线程为独占线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                //c !=0 大于0的情况 这种情况需要检查一下 当前线程是不是 独占的线程 因为ReentrantLock是可以重入的
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
        // 执行到这里:
        // cas失败 c==0时候 cas修改state时 为竞争过其他线程
        //或者 c>0 但是持有锁的线程不是当前的线程
            return false;
        }



// AQS#addWaiter Node入队
// 最终会返回当前线程包装的node
    private Node addWaiter(Node mode) {
        // 构建Node,把当前线程封装到node中
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 先尝试快速入队 如果失败了再完整入队
        //获取队尾节点 保存到pred变量中
        Node pred = tail;
        if (pred != null) {
            //当前节点指向pred(tail,队列的尾节点)
            node.prev = pred;
            // cas成功:说明node入队成功
            if (compareAndSetTail(pred, node)) {
                //pred(tail)的后置节点指向当前的node
                pred.next = node;
                return node;
            }
        }
        // 什么时候会执行到这里
        // 1.当前队列是空队列 tail==null
        // 2.cas失败 竞争入队失败 
        enq(node);
        return node;
    }

    // AQS#enq 完整入队
    private Node enq(final Node node) {
        // 自旋入队,只有当前node入队成功后才会跳出循环
        for (;;) {
            Node t = tail;
            //1. 当前队列是空队列 tail == null
            // 说明当前的 锁被占用,且当前线程可能是第一个获取锁失败的线程(当前时刻可能并发存在一批获取锁失败的线程)
            if (t == null) { // Must initialize
            // 作为当前持锁线程的第一个后继线程 需要做什么事?
            //1。 因为当前持有锁的线程(注意不是当前线程)它在获取锁的时候 直接tryAcquire成功了,没有向阻塞队列中 添加任何node,所以作为后继的线程需要初始化这个队列
            // 添加自己的node到队列中
            // cas成功,说明当前线程创建head节点成功  因为只有一个节点 所以head跟tail是同一个,然后继续下一次循环
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            // node的前置节点指向尾节点
            // cas成功:说明更改尾部节点为当前线程node成功 此时 尾部节点就为当前节点 并返回
            // 实际上 这时候线程只是入队了 并没有被park 所以下一步 需要park已经唤醒
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

// tryAcquire里面调用的 在入队成功后执行的方法
// acquireQueued 需要做些什么事?
//1.当前节点有没有被park?没有就需要被park
//2.唤醒之后的逻辑

// 参数一:node事当前线程包装的node 并且当前时候已经入队成功了
// 参数而:当前线程抢占资源成功后 设置state值时候 会用到
final boolean acquireQueued(final Node node, int arg) {
        // true 表示当前线程抢占锁成功
        // false 表示失败 需要执行出队的逻辑(响应中断)
        boolean failed = true;
        try {
            // 当前线程是否被中断过
            boolean interrupted = false;
            // 自旋
            for (;;) {
                // 获取当前node的前置节点
                final Node p = node.predecessor();
                // 如果当前node的前置节点为head 再尝试获取锁
                if (p == head && tryAcquire(arg)) {
                // 获取锁成功后 把head节点设置为当前节点 也就是说这个队列的第一个节点永远是当前持有锁的线程所对应的节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                // 返回false表示不需要排队
                    return interrupted;
                }
                // shouldParkAfterFailedAcquire 这个方法是干嘛的?
                // 当前线程获取锁失败后是否需要被挂起 
                // 返回值:true 当前线程需要被挂起
                /// 返回值:false 当前线程不需要被挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 挂起线程 并且返回当前线程的中断标记
                    //(唤醒:1.正常唤醒 其他线程upark了它 2. 其他线程给了它一个中断信号)
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }



/**
 * 参数一:pred 表示当前线程node的前置节点
 * 参数二:node 表示当前线程对应的node
 * 返回值:boolean true表示当前线程是需要被挂起的
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前置节点的 状态:0 默认状态 new Node();-1 Signal状态,表示当前节点释放锁后会唤醒它的第一个后继节点;>0 表示当前节点是CAMCELED状态
        int ws = pred.waitStatus;
        // 条件成立:表示当前前置节点是一个可以唤醒当前节点的节点。即返回true
        if (ws == Node.SIGNAL)
                      return true;
        // 前置节点是CANCELED状态
        if (ws > 0) {
            do {
                // 找前置节点的waitStatus <=0的情况
                // 也是是说 CANCELED的节点后被出队
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        // 当前node前置节点的状态就是0的这一种情况
        // 将当前线程的node的前置节点状态设置为SINGNAL 表示释放锁后需要唤醒当面线程
              compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

释放锁过程

Reentrantlock.unlock()->sync.release()

// AQS#release方法
//Reentrantlock.unlock()->sync.release()【aqs提供的release 】

   public final boolean release(int arg) {
        // 尝试释放锁 返回true表示完成释放锁
        if (tryRelease(arg)) {
        // head 什么时候会被创建呢?
        // 当前持有锁的线程未释放时,且持有锁的期间,有其他线程尝试获取锁,其他线程发现获取不到锁,而且队列是空队列此时后续的线程会为当前持有锁的线程创建一个head节点 然后自己再追加到head节点后

            Node h = head;
            //条件一:说明队列head节点已经初始化过,ReentrantLock在使用期间发生过锁竞争
            // 条件二:条件成立 说明head节点后面一定插入过node节点
            if (h != null && h.waitStatus != 0)
                // 唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

// Sync#tryRelease 
protected final boolean tryRelease(int releases) {
            // 减去释放的值
            int c = getState() - releases;
            // 条件成立:说明当前线程并未持有锁 直接抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
            // 条件成立 说明当前线程已经完全达到释放锁的条件
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
// 唤醒当前节点的下一个节点
private void unparkSuccessor(Node node) {
        // 获取当前节点的状态
        int ws = node.waitStatus;
        if (ws < 0) // -1 Signal 改为零的原因:因为当前节点已经完成了唤醒后继节点的任务
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        //s 什么时候等于null?
        // 条件一
        //1.当前节点就是tail节点时 s==null
        //2.当新节点入队未完成的时候(1。设置新节点的pre指向pred2.cas设置新节点为tail 3.pred.next指向 新节点(这一步未完成 的时候head节点调用了upark这个方法的时候)
        // 条件二
        //成立:说明当前node节点的后继节点是取消状态是不能被唤醒的 需要找个能唤醒的唤醒
        if (s == null || s.waitStatus > 0) {
            s = null;
            // for循环查找 会找个一个距离当前node最近的一个可以被唤醒的node 可能会找不到
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
    // 如果找到合适的可以被唤醒的node就直接唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    ```