Condition

发布时间 2023-03-30 16:58:10作者: zhengbiyu

Condition用途

当多个线程需要访问一个共享资源时,需要给共享资源加锁。 当一个线程释放锁时,所有等待锁的线程都会尝试去获取锁。如果想只让部分等待锁的线程去获取锁时,就需要用到Condition。

 

执行wait方法后,线程会阻塞,并释放同步代码块的锁(sleep方法会持有锁),notify的方法执行,会唤醒某个线程,但是如果有多个线程执行wait方法阻塞,notify的执行只会唤醒其中某个线程,并不能指定线程唤醒,这时要使用notifyAll才能达到唤醒所有阻塞线程。这样确实有点麻烦,而condition的引入就是为了解决只唤醒执行阻塞的线程。它具有超时作用,即超过某段时间,即会自动唤醒,不会造成一直阻塞。常用的阻塞队列就是这个类实现的。

 

使用注意点,await和signal必须要在lock方法后执行,如果不执行lock方法就执行await或signal,会出现异常的。

 

head、tail是AQS的等待队列,firstWaiter、lastWaiter是condition的等待队列。

 

整体分析

Condition具体实现在AbstractQueuedSynchronizer类中。这个类中管理了一个同步队列和N多个条件队列

阻塞队列记录了等待获取锁的线程,头结点记录了当前正在运行的线程。

条件队列记录了由Condition.await()阻塞的线程,一个Lock可以有多个Condition,每个Condition是一个队列。

Condition是AbstractQueuedSynchronizer的一个内部类ConditionObject,所以创建的Condition对象是可以访问整个AbstractQueuedSynchronizer对象的属性的,通过这样将Condition与Lock相关联。

Condition原理介绍

Condition是AQS的内部类。每个Condition对象都包含一个条件队列 (等待队列)

等待队列是一个FIFO的队列,在队列中的每个节点Node都包含了一个线程引用,该线程就是在Condition对象上等待的线程。

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        /**
         * Creates a new {@code ConditionObject} instance.
         */
        public ConditionObject() { }
}

等待队列的基本结构如下所示:

等待队列分为首节点和尾节点。

当一个线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。新增节点就是将尾部节点指向新增的节点。节点引用更新本来就是在获取锁以后的操作,所以不需要CAS保证。同时也是线程安全的操作。

当一个线程调用signal()将条件队列中的第一个节点加入到同步队列的尾端,表示可以被唤醒执行。

await()

如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。

public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 将当前线程包装成一个Node节点,并节点状态为condition,值为-2
        Node node = addConditionWaiter();
        // 释放当前线程持有的所有锁
        long savedState = fullyRelease(node);
        int interruptMode = 0;
        // 判断当前线程是否在lock的等待队列中,即在head->tail队列中,如果不在,那就是还在condition的等待队列中,阻塞当前线程。
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 当前线程执行了signal方法会经过这个,即重新将当前线程加入同步队列中
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
}
 final long fullyRelease(Node node) {
       boolean failed = true;
       try {
            // 获取当前线程状态值,即持有了几个锁
            long savedState = getState();
           // 释放锁,最后执行到ReentrantLock的tryRelease()方法,该段代码会判断当前线程是与持有锁的线程是同一个线程,如果不是,则抛异常
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
       } finally {
            if (failed)
                 // 抛异常了,则将此节点状态改为canceled,等待从队列中移除
                node.waitStatus = Node.CANCELLED;
      }
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

signal()

signal起的作用并不是直接唤醒线程,它的作用是把阻塞的线程从condition等待队列移到ReentrantLock等待队列中。但是之前调用await的节点在调用await时被挂起了,怎么被唤醒呢?当这个ReentrantLock下挂载的任意一个condition再触发await时,ReentrantLock等待队列中的下一个有效节点即会被唤醒。
public final void signal() {
     // 与await方法一样,如果不在lock方法内执行,则也会抛异常
     if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
     Node first = firstWaiter;
    if (first != null)
       //唤醒condition等待队列中线程
         doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
         first.nextWaiter = null;
     } while (!transferForSignal(first) &&
              (first = firstWaiter) != null);
}
 
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled. 如果改变node节点状态失败,即该节点被取消了
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 将该节点加入ReentrantLock等待队列中,即head->tail队列中
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果节点被取消,或更改状态失败,则唤醒被阻塞的线程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}