Condition用途
当多个线程需要访问一个共享资源时,需要给共享资源加锁。 当一个线程释放锁时,所有等待锁的线程都会尝试去获取锁。如果想只让部分等待锁的线程去获取锁时,就需要用到Condition。
整体分析
Condition具体实现在AbstractQueuedSynchronizer类中。这个类中管理了一个同步队列和N多个条件队列。
阻塞队列记录了等待获取锁的线程,头结点记录了当前正在运行的线程。
条件队列记录了由Condition.await()阻塞的线程,一个Lock可以有多个Condition,每个Condition是一个队列。
Condition是AbstractQueuedSynchronizer的一个内部类ConditionObject,所以创建的Condition对象是可以访问整个AbstractQueuedSynchronizer对象的属性的,通过这样将Condition与Lock相关联。
Condition原理介绍
Condition是AQS的内部类。每个Condition对象都包含一个条件队列 (等待队列)。
等待队列是一个FIFO的队列,在队列中的每个节点Node都包含了一个线程引用,该线程就是在Condition对象上等待的线程。
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } }
等待队列的基本结构如下所示:
等待队列分为首节点和尾节点。
当一个线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。新增节点就是将尾部节点指向新增的节点。节点引用更新本来就是在获取锁以后的操作,所以不需要CAS保证。同时也是线程安全的操作。
当一个线程调用signal()将条件队列中的第一个节点加入到同步队列的尾端,表示可以被唤醒执行。
await()
如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 将当前线程包装成一个Node节点,并节点状态为condition,值为-2 Node node = addConditionWaiter(); // 释放当前线程持有的所有锁 long savedState = fullyRelease(node); int interruptMode = 0; // 判断当前线程是否在lock的等待队列中,即在head->tail队列中,如果不在,那就是还在condition的等待队列中,阻塞当前线程。 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 当前线程执行了signal方法会经过这个,即重新将当前线程加入同步队列中 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } final long fullyRelease(Node node) { boolean failed = true; try { // 获取当前线程状态值,即持有了几个锁 long savedState = getState(); // 释放锁,最后执行到ReentrantLock的tryRelease()方法,该段代码会判断当前线程是与持有锁的线程是同一个线程,如果不是,则抛异常 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) // 抛异常了,则将此节点状态改为canceled,等待从队列中移除 node.waitStatus = Node.CANCELLED; } } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
signal()
public final void signal() { // 与await方法一样,如果不在lock方法内执行,则也会抛异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) //唤醒condition等待队列中线程 doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. 如果改变node节点状态失败,即该节点被取消了 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 将该节点加入ReentrantLock等待队列中,即head->tail队列中 Node p = enq(node); int ws = p.waitStatus; // 如果节点被取消,或更改状态失败,则唤醒被阻塞的线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }