【Java 并发】【九】【AQS】【四】ReentrantLock的Condition机制原理

发布时间 2023-04-06 22:44:40作者: 酷酷-

1  前言

上一节我们深入分析了ReentrantLock公平锁、非公平锁底层的源码实现,这节就分析ReentrantLock提供的另外一个机制,Condition机制的底层实现。

2  什么是Condition,是干什么的

ReentrantLock提供的这个Condition功能,底层还是基于AQS的Condition机制的,Condition必须要配合一个锁来使用,具有的功能跟我们之前讲解过的synchronized和wait、notify/notifyAll是一样的, 实现的功能是控制多线程的行为。

什么是控制多线程的行为:能控制每个线程在什么条件下做啥事情。比如说想在并发安全的条件下,让线程1、线程2交叉打印数字1、数字2,实现打印效果类似:121212121212....这种。就是控制线程的行为。下面我给你举个例子,使用Condition控制前程的行为来打印1、2,我们来看个例子:

public class ReentrantLockConditionTest {
    // 共享变量
    private static int num = 1;
    // 互斥锁 这个例子如果去掉static会发生什么呢? 这把锁就不是同一把了,就会失效导致结果不对
    private static ReentrantLock lock = new ReentrantLock();
    // reentrantLock创建一个Condition
    private static Condition condition = lock.newCondition();

    static class OddNumberThread extends Thread {
        @Override
        public void run() {
            for (int i=0; i<10000; i++) {
                try {
                    // 由于变量value是非线程安全的,每次操作前需加锁
                    lock.lock();
                    // 当value的值不是奇数的时候,直接沉睡等待
                    while (num % 2 == 0) {
                        // 调用condition的await方法,释放锁,同时进入沉睡
                        // 等待别人调用singal/singalAll唤醒自己,然后重新获取锁
                        condition.await();
                    }
                    // 走到这里说明value是奇数,并且自己获取了锁
                    System.out.print("1");
                    // 执行value的++操作
                    num++;
                    // 唤醒调用condition.await而陷入等待的线程;这里就是唤醒线程2
                    condition.signal();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 最后释放锁
                    lock.unlock();
                }
            }
        }
    }
    static class EvenNumberThread extends Thread {
        @Override
        public void run() {
            for (int i=0; i<10000; i++) {
                try {
                    // 获取锁
                    lock.lock();
                    // 当value的值不是偶数的时候,直接沉睡等待
                    while (num % 2 == 1) {
                        // 调用condition的await方法,释放锁,同时进入沉睡
                        // 等待别人调用singal/singalAll唤醒自己,然后重新获取锁
                        condition.await();
                    }
                    // 走到这里说明value是偶数,并且自己获取了锁
                    System.out.print("2");
                    // 执行value的++操作
                    num++;
                    // 唤醒调用condition.await而陷入等待的线程;这里就是唤醒线程2
                    condition.signal();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 最后释放锁
                    lock.unlock();
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 创建两个线程并启动
        OddNumberThread demo1 = new OddNumberThread();
        EvenNumberThread demo2 = new EvenNumberThread();
        demo1.start();
        demo2.start();
        // 等待两个线程都执行完
        demo1.join();
        demo2.join();
    }
}

上面的例子啊,就是使用Condtion来控制线程的行为,Condition必须是配合一个锁来使用的。

(1)OddNumberThread线程获取锁之后,判断一下num是不是奇数,如果不是奇数,他就调用condition.await()方法,释放锁,同时沉睡等待(条件:只在num为奇数的时候打印)。

当被唤醒之后会重新尝试获取锁,获取锁成功之后才能继续运行。

如果是奇数的话,打印1,然后执行num++,然后调用condition.signal方法唤醒因为调用condition.await方法而陷入等待的线程(这里EvenNumberThread线程而因为调用了condition.await方法而沉睡等待,被唤醒)

(2)同样的在EvenNumberThread线程获取锁之后,如果发现num是奇数,自己也会调用condition.await方法,释放锁,沉睡等待(条件:只在num为偶数的时候打印)。

然后是偶数的话就打印2,然后执行num++操作,最后执行condition.signal唤醒因为调用condition.await方法而陷入等待的线程。

我们画个图来理解理解:

简单点来说就是通过condition的await方法和singal方法来实现线程的沉睡和唤醒功能,从而来控制线程的行为也就是做什么样的操作。

我们看到上面的例子condition的await方法和singal方法都是在获取ReentrantLock之后才调用的。是不是只有获取锁成功之后才能进行await和singal方法的调用,进行线程控制?

是的,这里的使用其实跟synchronized、wait、notify的使用是一样的,wait、notify方法也是必须在synchronized代码块里面使用的,也就是说必须获取锁之后才能调用。同样condition的使用也必须在一个锁里面。

那接下来我们就来看看内部工作的原理,首先来讲的就是Condition.await内部的机制和源码实现。

3  Condition.await方法源码

我们看一下Condition的await方法的源码,其实就是AQS中await方法的源码(又是基于AQS),如下所示:

public final void await() throws InterruptedException {
    // 如果线程被中断了,直接抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 当前线程封装成Node节点,加入condition队列里面
    // 注意:这里是Condition队列,而不是AQS获取锁的等待队列,注意
    Node node = addConditionWaiter();
    // 这里是释放锁,完全释放锁资源,将state归于0
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 如果Node节点不在AQS获取锁的等待队列,这里一般都不会在
    while (!isOnSyncQueue(node)) {
        // 直接将线程挂起,让线程沉睡
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 走到这里说明有别的线程调用Condition.singal方法将你唤醒了
    // 这里调用AQS的acquireQueue方法,这个方法的作用之前讲过了
    // 就是将你放入AQS的等待队列里面,重新等待获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        // 这里就是删除一下无效的condition队列节点
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        // 由于等待时间可能太久了,被中断了
        reportInterruptAfterWait(interruptMode);
}

这里画个流程图,来理解一下:

如上图,就是Condition.await方法内部的大致流程了,由于Condition的await方法是直接调用AQS的await方法的,所以也是AQS的await方法内部的整体流程:

(1)第一步首先就是调用addConditionWaiter方法,将当前线程直接封装成一个Node加入Condition队列,注意:这里说的是Condition队列!!,不是AQS等待锁的等待队列!!,这里要注意,不要弄乱了!!!
(2)加入Condition队列之后,就是释放锁资源,这里fullyRelease就是完全释放锁,不管之前获取了锁多少次,这里都完全释放,将state 资源重新置为0。
(3)通过isOnSyncQueue方法判断是不是在AQS等待锁的等待队列,如果不是那就直接调用LockSupport.park方法将线程挂起,这里挂起之后需要别的线程调用Condition.singal或者Condition.singalAll方法才能将自己唤醒
(4)自己被唤醒之后,将自己加入AQS的等待队列,去等待队列重新等待锁!!!如果自己已经在AQS的等待队列了,跳出循环,然后在AQS的等待队列里面等待获取锁,注意:这里需要在AQS的等待队列里面等待锁,才能执行后面的业务逻辑方法。

从整体上其实就是加入Condition队列,然后释放锁,之后沉睡。当别的线程唤醒它之后,它要重新进入AQS获取锁的等待队列里面,只有重新获取锁成功才能执行业务逻辑方法。这里的addConditionWaiter方法、fullyRelease方法、isOnSyncQueue内部机制是怎么样的?我们接下来看。

3.1  AQS等待队列Node类数据结构回顾

首先讲的是addConditionWaiter是怎么将线程封装成Node加入Condition队列的。讲解addConditionWaiter源码之前,我们回顾下之前在AQS讲解过的Node节点的数据结构。

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    // -2表示节点在Condition队列,等待别人唤醒
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    // Node节点的等待状态
    volatile int waitStatus;
    // AQS队列中,Node节点指向前一个节点的指针
    volatile Node prev;
    // AQS队列中,Node节点指向后一个节点的指针
    volatile Node next;
    // 线程
    volatile Thread thread;
    // 这里啊就是Condition队列的Node节点,指向下一个节点的指针了
    Node nextWaiter;
}

其中prev、next指针是用在AQS等待队列的,分别指向AQS等待队列的前一个节点和后一个节点。然而nextWaiter使用在Condition队列的。

Condition是由单向链表构成的一个队列,nextWaiter表示当前Condtion队列节点的下一个节点。AQS内部相当于有两个队列,一个是AQS等待锁的等待队列,是一个双向链表;另一个是Condition队列,只有用到Condition机制的时候才会创建这个队列,是一个单向链表。

AQS使用一个head和tail的指针管理着AQS等待队列,那这个Condition队列又是怎么来进行管理的?一样的思路,ConditionObject也就是Condition接口的实现类,内部也是搞了两个指针来管理着Condition队列的。

ConditionObject内部的数据结构:

public class ConditionObject implements Condition, java.io.Serializable {
    // 这里就是指向Condition队列头节点的指针
    private transient Node firstWaiter;
    // 这里就是指向Condition队列尾节点的指针
    private transient Node lastWaiter;
}

它也是通过两个指针来管理Condition队列的;只不过命名不一样而已。AQS等待队列使用的头节点、尾节点指针叫做head、tail,而Condition队列使用头节点、尾节点的名字叫做firstWaiter、lastWaiter而已。

再画个图对比一下AQS获取锁的等待队列、Condition队列:

 了解了这个,那我们接下来就进入addConditionWaiter方法的源码剖析。

3.2  addConditionWaiter方法源码

private Node addConditionWaiter() {
    // 获取Condition队列的尾结点
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    // 如果Condition队列中已经有些节点的线程因为超时或者中断原因被取消了
    // 这里的unlinkCancelledWaiters方法就是删除哪些无效的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 创建一个Node节点
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 下面就是把node插入condition队列的尾部了
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

上面就是获取一下Condition队列的尾节点,然后将当前线程封装成一个Node节点,插入Condtion队列的尾部,就完事了。当然如果发现lastWaiter节点状态不对,可能是由于超时或者中断的原因导致lastWaiter节点被取消了,此时就会调用unlinkCanceledWaiter方法删除一下无用节点。

再来看一下unlinkedCancelledWaiter方法源码:

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    // 这里就是从头结点开始遍历整个链表
    // 然后检查状态,状态不对的就删除
    // 这里就是最基础的链表遍历和删除代码了
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

这里就是一段基础的链表遍历和链表删除的操作,这里代码没啥太多的东西。

addConditionWaiter方法里面的源码都挺简单的,就是往Condition队列尾部插入一个节点,如果发现Condition队列尾节点状态不对,则遍历一下Condition队列,删除一下无用节点,就是基础的链表遍历、插入、删除操作而已。

3.3  fullyRelease方法源码

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 首先获取加锁的次数
        int savedState = getState();
        // 这里就是调用AQS里面的release方法去释放资源,
        // 之前已经分析过了release方法的源码了
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

fullyRelease方法的源码就更简单了,它就是直接调用release方法去全部释放资源,至于release方法的源码之前我们已经讲解过了。我们这里再说一下,就是先调用子类的tryRelease方法去释放资源,如果释放成功了则调用AQS的unparkSuccessor方法去唤醒等待中沉睡的线程。

3.4  isOnSyncQueued方法源码

接下来再分析一下,isOnSyncQueued方法,这个方法就是遍历一下等待队列,然后依次比较,看看节点是不是在队列里面而已,我们来看看:

final boolean isOnSyncQueue(Node node) {
    // 如果Node节点状态是CONDITION,说明肯定在CONDITION队列,不在等待队列
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果他的next指着有值,说明肯定在等待队里了
    if (node.next != null)
        return true;
    // 这里就是遍历整个等待队列,一个个比较
    return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    // 从尾部往前遍历,一个个对比是否等于node节点
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

isOnSyncQueued方法的源码,如果Node的next指针有值肯定在等待队列了,如果Node节点的waitStatus是CONDITION肯定就不在了,因为它会在Condition队列。然后上面的情况都不是的话就遍历整个等待队列一个个对比了。

看了addConditionWaiter、fullyRelease、isOnSyncQueue这几个方法的源码之后,我们把Condition.await方法的流程图再细化一下:

讲解到这里Condition.await方法内部的全部源码流程都讲解完了,包括调用方法之后怎么通过addConditionWaiter放入到Condition队列的尾部,然后调用fullyRelease释放锁,然后就陷入沉睡,当被唤醒之后就调用AQS的acquireQueud方法去重新竞争锁资源,获取锁成功之后才能继续执行业务代码。

4  Condition.singal方法源码

await分析完了,我们分析Condition的另外一个方法,Condition.singal是怎么唤醒Condition队列里面的线程的,其实比Condiution.await的简单很多,我们来看看。

看Condition的signal方法实现者,也就是AQS里面的signal方法源码:

public final void signal() {
    // 首先判断一下自己是不是拥有独占锁
    // 没有独占锁,不能调用signal方法,会抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 获取Condition队列的头结点firstWaiter
    Node first = firstWaiter;
    if (first != null)
        // 调用doSignal方法去唤醒
        doSignal(first);
}

我们接着看doSignal方法的源码:

private void doSignal(Node first) {
    do {
        // 这里的逻辑就是从头往后遍历Condition链表
        // 找到一个节点不是null的,然后调用唤醒,就那么简单
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        // 这里的实际唤醒逻辑在transferForSignal方法里面
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

继续看transferForSignal方法源码:

final boolean transferForSignal(Node node) {
    // 唤醒前将节点等待状态从CONDTION改为0
    // 因为后面唤醒之后还要进入等待队列去争抢锁,所以改为0也就是等待队列的初始状态
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 这里就是直接插入AQS等待队列了,之前讲解AQS的时候详细分析过enq源码
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 看,这里就是直接调用LockSupport.unpark方法将线程唤醒了
        LockSupport.unpark(node.thread);
    return true;
}

整体上来看其实就是从Condition队列里面头节点开始尝试唤醒节点。唤醒之前会插入AQS的等待队列让他们再次尝试获取锁,然后就是直接调用LockSupport.unpark方法唤醒线程了。

我们画个图来理解一下:

好了,到这里我们Condition.signal方法的源码就讲解结束了,其实还有一个Condtion.signalAll方法是唤醒Condition队列中所有等待线程的,你可以自己看一下,逻辑基本和Condition.signal一致,只是唤醒所有的而已。

5  Condition.await、signal 方法汇总

最后我们整体总结一下Condition.await、Condition.signal的核心机制:

6  小结

到这里,Condition机制的内部实现已经完全分析完了,有理解不对的地方欢迎指正哈。