从ReentrantLock角度解剖AQS----未完成,持续更新中。。

发布时间 2023-05-31 13:30:17作者: Alpari
AQS重要属性
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	// 内部类node
	static final class Node {
		// 等待状态
		volatile int waitStatus;
		// 前结点
		volatile Node prev;
		// 后结点
		volatile Node next;
		// 当前线程
		volatile Thread thread;
	}
	// 头结点
	private transient volatile Node head;
	// 尾结点
	private transient volatile Node tail;
	// 状态
	private volatile int state;
}
1、创建
// 构造方法创建对象,空或false创建非公平锁,true创建公平锁
ReentrantLock lock = new ReentrantLock();

public ReentrantLock(boolean fair) {
	sync = fair ? new FairSync() : new NonfairSync();
}
2、lock();方法
// 1.加锁
lock.lock();
// 2.内部调用的是sync.lock();方法
public void lock() {
	sync.lock();
}
// 3.sync为ReentrantLock内部类,其继承AQS
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {}

// 4.再往下看此处lock方法为NonfairSync非公平锁的实现
final void lock() {
	// 4.1 compareAndSetState比较并设置状态(AQS中重要的属性之一  private volatile int state; 代表锁的状态)期望若为0,则更新成1,底层调用的是unsafe.compareAndSwapInt()内部方法,加锁成功返回true
	if (compareAndSetState(0, 1))
		// 如果加锁成功,则将当前线程设置为其拥有线程
		setExclusiveOwnerThread(Thread.currentThread());
	else
		// 否则调用acquire方法,看下面 5. 分析
		acquire(1);
}

// 5.acquire方法(3个重要方法)--arg==1
public final void acquire(int arg) {
	// 5.1 tryAcquire(arg)
	// 5.2 addWaiter(Node.EXCLUSIVE)
	// 5.3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
	if (!tryAcquire(arg) &&
		acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}

// 5.1 tryAcquire(arg) --arg==1
// 是AQS的模板方法,强制子类去实现,否则直接报错
protected boolean tryAcquire(int arg) {
	throw new UnsupportedOperationException();
}
// 这个是非公平锁的实现,如果当前没有线程或与之前线程是同一线程,则true,否则false(说明已存在线程,需要等待)
protected final boolean tryAcquire(int acquires) {
	return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
	// 获取当前线程
	final Thread current = Thread.currentThread();
	// AQS中重要的状态
	int c = getState();
	// 若状态为0则表示无线程抢占
	if (c == 0) {
		// 则设置当前线程并更改状态,和 4.1一致,返回true
		if (compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true;
		}
	}
	// 否则代表有线程了,判断和之前的线程是否是同一个,此处说明ReentrantLock为可重入锁
	else if (current == getExclusiveOwnerThread()) {
		// 同一线程则锁加1,返回true
		int nextc = c + acquires;
		if (nextc < 0) // overflow
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	return false;
}

// 5.2 addWaiter(Node.EXCLUSIVE) -- EXCLUSIVE = null
private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	// node头=尾
	Node pred = tail;
	// 若头不为空,则当前node的前结点设为头结点
	if (pred != null) {
		node.prev = pred;
		// 5.2.1 compareAndSetTail 设置尾结点,头结点的后指针指向当前结点(即双向结点设置完毕),底层调用native的unsafe方法
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	// 若头结点为空,则 5.2.2 enq(node);
	enq(node);
	return node;
}

// 5.2.2 enq(node); --若头结点为空
private Node enq(final Node node) {
	// 自旋
	for (;;) {
		Node t = tail;
		// 若头结点为空,则必须先初始化头结点
		if (t == null) { // Must initialize
			// unsafe方法设置头结点--new Node()初始化
			if (compareAndSetHead(new Node()))
				tail = head;
		} else {
			// 头结点不为空,则当前结点的前指针指向头结点
			node.prev = t;
			// 设置尾结点,头结点的后指针指向当前结点
			if (compareAndSetTail(t, node)) {
				t.next = node;
				return t;
			}
		}
	}
}

// 5.3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) --arg==1
final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
		boolean interrupted = false;
		// 自旋
		for (;;) {
			// 获取当前结点的前结点
			final Node p = node.predecessor();
			// 若前结点为头结点,继续尝试获取锁
			if (p == head && tryAcquire(arg)) {
				// 抢占锁成功后则队列中不需要了
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			// 否则尝试入队 5.3.1 shouldParkAfterFailedAcquire(p, node)
			// 5.3.2 parkAndCheckInterrupt()
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

// 否则尝试入队 5.3.1 shouldParkAfterFailedAcquire(p, node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	// 获取头结点的等待状态 waitStatus重要属性
	int ws = pred.waitStatus;
	// -1,处于锁状态,等待唤醒
	if (ws == Node.SIGNAL)
		return true;
	if (ws > 0) {
		do {
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}

// 5.3.2 parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
	// 将当前线程锁住
	LockSupport.park(this);
	return Thread.interrupted();
}
3、lock.unlock()
// 1.unlock()
lock.unlock();
// 2.调用的是 sync.release(1);
public void unlock() {
	sync.release(1);
}
// 3.release(int arg)
public final boolean release(int arg) {
	if (tryRelease(arg)) {
		Node h = head;
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h);
		return true;
	}
	return false;
}

// tryRelease(int releases)
protected final boolean tryRelease(int releases) {
	int c = getState() - releases;
	if (Thread.currentThread() != getExclusiveOwnerThread())
		throw new IllegalMonitorStateException();
	boolean free = false;
	if (c == 0) {
		free = true;
		setExclusiveOwnerThread(null);
	}
	setState(c);
	return free;
}

// unparkSuccessor(h)
private void unparkSuccessor(Node node) {
	/*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
	int ws = node.waitStatus;
	if (ws < 0)
		compareAndSetWaitStatus(node, ws, 0);

	/*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
	Node s = node.next;
	if (s == null || s.waitStatus > 0) {
		s = null;
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
				s = t;
	}
	if (s != null)
		LockSupport.unpark(s.thread);
}