ReentrantReadWriteLock源码分析

发布时间 2023-05-08 09:07:47作者: 无虑的小猪

  ReentrantLock是互斥锁,若存在读多写少同时保证线程安全的场景,ReentrantLock效率比较低,此时需要用到ReentrantReadWriteLock。

一、ReentrantReadWriteLock介绍

  ReentrantReadWriteLock是可重入的读写锁,实现了ReadWriteLock接口,ReadWriteLock是读写锁的顶级接口,定义了readLock、writeLock方法。

// 读写锁接口
public interface ReadWriteLock {
    // 获取读锁
    Lock readLock();
    // 获取写锁
    Lock writeLock();
}

  读写锁,读写互斥、写写互斥、读读不互斥。

二、ReentrantReadWriteLock使用

 1 import java.util.concurrent.locks.ReentrantReadWriteLock;
 2 
 3 public class TestReentrantReadWriteLock {
 4 
 5     public static void main(String[] args) {
 6         // 可重入读写锁
 7         ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 8         // 获取读锁
 9         ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
10         // 获取写锁
11         ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
12         
13         // 加写锁
14         writeLock.lock();
15         try {
16             // 写锁处理
17         }finally {
18             // 释放写锁
19             writeLock.unlock();
20         }
21                 
22         // 加读锁
23         readLock.lock();
24         try {
25             // 读锁处理
26         }finally {
27             // 释放读锁
28             readLock.unlock();
29         }
30     }
31 }

三、ReentrantReadWriteLock原理

  ReentrantReadWriteLock中既有读锁又有写锁,读写锁的状态是如何而控制的呢?AQS的state锁状态是int类型,4字节32位,ReentrantReadWriteLock将高16位作为读锁的状态,低16位作为写锁的状态。

四、ReentrantReadWriteLock源码分析

1、ReentrantReadWriteLock构造函数

 1 // 读锁  ReentrantReadWriteLock的内部类
 2 private final ReentrantReadWriteLock.ReadLock readerLock;
 3 // 写锁  ReentrantReadWriteLock的内部类
 4 private final ReentrantReadWriteLock.WriteLock writerLock;
 5 
 6 public ReentrantReadWriteLock(boolean fair) {
 7     // 根据fair 选择 公平锁 或者 非公平锁
 8     sync = fair ? new FairSync() : new NonfairSync();
 9     // 创建读锁
10     readerLock = new ReadLock(this);
11     // 创建写锁
12     writerLock = new WriteLock(this);
13 }

2、ReentrantReadWriteLock源码实现

  ReentrantReadWriteLock持有内部类如下:

1、公平锁/非公平锁

  ReentrantReadWriteLock的公平锁、非公平锁基于抽象内部类Sync实现的,两者主要区别在于是读写是否等待的实现不同。

1.1、writerShouldBlock

 1 // 非公平锁,写,直接尝试获取写锁
 2 final boolean writerShouldBlock() {
 3     return false;
 4 }
 5 
 6 // 公平锁,写, 判断队列中是否阻塞的读/写线程等待唤醒抢锁资源
 7 // 若有等待线程,返回true
 8 final boolean writerShouldBlock() {    
 9     return hasQueuedPredecessors();
10 } 

  等待队列中是否有其他读/写线程等待获取锁资源:

 1 // 判断队列中是否已有等待获取锁资源的其他读/写线程
 2 public final boolean hasQueuedPredecessors() {
 3     // 等待队列尾节点
 4     Node t = tail;
 5     // 等待队列头节点
 6     Node h = head;
 7     Node s;
 8     // 等待队列中有其他读/写线程在等待获取锁资源
 9     return h != t &&
10         ((s = h.next) == null || s.thread != Thread.currentThread());
11 }

1.2、readerShouldBlock

1 // 非公平锁,读,判断队列头结点是否是写锁,若有写锁,当前读线程应排队
2 final boolean readerShouldBlock() {
3     return apparentlyFirstQueuedIsExclusive();
4 }
5 
6 // 公平锁,读,判断等待队列中是否有其他线程在排队
7 final boolean readerShouldBlock() {
8     return hasQueuedPredecessors();
9 }

1.3、总结

  公平锁与非公平锁的主要区别在于:公平锁无论是获取读锁还是写锁,优先判断等待队列中是否有等待获取锁资源的Node节点,若有,当前线程等待;非公平锁,直接尝试获取写锁,获取读锁时判断队列首个Node节点是否为写锁,若为写锁,当前读线程等待。

2、读锁/写锁

  读锁、写锁构造函数如下:

1 // 读锁构造函数
2 protected ReadLock(ReentrantReadWriteLock lock) {
3     sync = lock.sync;
4 }
5 
6 // 写锁构造函数
7 protected WriteLock(ReentrantReadWriteLock lock) {
8     sync = lock.sync;
9 }

2.1、写锁

  为方便描述,下面用AQS表示抽象队列同步器AbstractQueuedSynchronizer。

2.1.1、写锁加锁

  ReentrantReadWriteLock$WriteLock#lock() 核心代码:

public void lock() {
    sync.acquire(1);
}

  获取读写锁中的写锁,AbstractQueuedSynchronizer#acquire() 核心代码:

1 public final void acquire(int arg) {
2     // 获取锁资源
3     if (!tryAcquire(arg) &&
4         // 获取锁资源失败,将当前线程节点加入等待队列,判断是否阻塞线程
5         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
6         selfInterrupt();
7 }

  AbstractQueuedSynchronizer的addWaiter、acquireQueued方法在ReentrantLock源码分析中已做详细介绍,此处不再赘述。下面来重点看看ReentrantReadWriteLock的tryAcquire方法:

 1 protected final boolean tryAcquire(int acquires) {
 2     // 获取当前线程
 3     Thread current = Thread.currentThread();
 4     // 获取AQS的读写锁状态 state
 5     int c = getState();
 6     // 获取 state 低16位 写锁状态
 7     int w = exclusiveCount(c);
 8     // state != 0, AQS读写锁状态不为0
 9     if (c != 0) {
10         // 写锁状态为0 ,持有所资源线程不为当前线程,获取锁资源失败
11         if (w == 0 || current != getExclusiveOwnerThread())
12             return false;
13         // 重入次数溢出,抛出异常
14         if (w + exclusiveCount(acquires) > MAX_COUNT)
15             throw new Error("Maximum lock count exceeded");
16         // 设置的 state 状态, 重入次数 + 1
17         setState(c + acquires);
18         // 返回true,获取锁资源成功
19         return true;
20     }
21     // 写锁获取锁资源是否需要等待
22     // (公平锁 - 判断等待队列中是否已经有线程在等待,有线程等待当前线程加入等待队列;非公平锁 - 直接尝试抢锁)
23     if (writerShouldBlock() ||
24         // CAS 抢锁资源
25         !compareAndSetState(c, c + acquires))
26         // 抢锁资源失败
27         return false;
28     // 抢锁资源成功,设置线程属性为当前线程
29     setExclusiveOwnerThread(current);
30     return true;
31 }

  读锁或写锁的状态值大于0,持有锁资源的线程不为当前线程,获取锁资源失败;

  读锁或写锁的状态值溢出,抛出异常,获取锁资源失败;

  获取锁资源成功,更新读锁或写锁的状态值,并设置线程属性为当前线程。

2.1.2、写锁释放锁

  ReentrantReadWriteLock$WriteLock#unlock() 核心代码:

public void unlock() {
    sync.release(1);
}

  释放读写锁中的写锁,AbstractQueuedSynchronizer#release() 核心代码:

 1 public final boolean release(int arg) {
 2     // 释放锁资源
 3     if (tryRelease(arg)) {
 4         // 等待队列头节点不为空,并且等待队列中有挂起的节点待唤醒
 5         Node h = head;
 6         if (h != null && h.waitStatus != 0)
 7             // 唤醒后继挂起线程  LockSupport.unpark
 8             unparkSuccessor(h);
 9         // 锁资源释放成功,返回true
10         return true;
11     }
12     // 锁资源释放失败,返回false
13     return false;
14 }

  释放锁资源,ReentrantReadWriteLock#tryRelease() 核心代码:

 1 protected final boolean tryRelease(int releases) {
 2     // 当前线程是否持有锁资源判断,不满足,抛异常
 3     if (!isHeldExclusively())
 4         throw new IllegalMonitorStateException();
 5     // 释放写锁的目标状态值,因为写锁状态是低16位,可以直接更新用state做操作
 6     int nextc = getState() - releases;
 7     // state低16位是否为0
 8     boolean free = exclusiveCount(nextc) == 0;
 9     // state低16位为0,表示当前线程彻底释放锁资源,线程属性设置null,当前线程不再持有锁
10     if (free)
11         setExclusiveOwnerThread(null);
12     // state低16位为0,更新写锁状态state,当前线程仍然持有锁资源
13     setState(nextc);
14     return free;
15 }
16 
17 // 持有锁资源的线程是否为当前线程
18 protected final boolean isHeldExclusively() {
19     return getExclusiveOwnerThread() == Thread.currentThread();
20 }

  持有锁的线程不为当前线程,抛出异常;

  写锁状态值修改,若修改后的写锁状态 - AQS的state低16位为0,当前线程彻底释放锁资源,不再持有锁,持有锁的线程属性设置为null,更新AQS的state值;若修改后的写锁状态- AQS的state低16位大于0,当前线程扔持有锁,调整当前线程的重入次数。

2.2、读锁

2.2.1、读锁 加锁

  ReentrantReadWriteLock$ReadLock#lock() 核心代码:

public void lock() {
    // 获取读锁
    sync.acquireShared(1);
}

  读写锁中的读锁获取锁真正的方法,AbstractQueuedSynchronizer#acquireShared() 核心代码:

1 public final void acquireShared(int arg) {
2     // 尝试获取锁资源,
3     if (tryAcquireShared(arg) < 0)
4         // 未获取到锁资源,排队处理
5         doAcquireShared(arg);
6 }
1、tryAcquireShared()

  尝试获取锁资源,ReentrantReadWriteLock#tryAcquireShared() 核心代码:

 1 // 最后一个获取锁资源的读线程对象
 2 private transient HoldCounter cachedHoldCounter;
 3 // 线程持有的重入次数  继承自ThreadLocal
 4 private transient ThreadLocalHoldCounter readHolds;
 5 // 存储第一个获取读锁的线程对象
 6 private transient Thread firstReader = null;
 7 // 存储第一个获取读锁的线程的重入次数
 8 private transient int firstReaderHoldCount;
 9 
10 protected final int tryAcquireShared(int unused) {
11     // 获取当前线程
12     Thread current = Thread.currentThread();
13     // 获取AQS的 state 状态值
14     int c = getState();
15     // 当前锁资源是否被其他写线程持有
16     if (exclusiveCount(c) != 0 &&
17         getExclusiveOwnerThread() != current)
18         // 被其他写线程持有,返回-1,需排队
19         return -1;
20     // 获取当前锁资源是否被读线程持有
21     int r = sharedCount(c);
22     // 当前读线程抢锁资源,优先判断当前读线程是否需要等待(公平锁/非公平锁实现方式不同)
23     if (!readerShouldBlock() &&
24         // 读锁是否已超出最大限制 
25         r < MAX_COUNT &&
26         // CAS 对 state 的高16 位 + 1
27         compareAndSetState(c, c + SHARED_UNIT)) {
28         // 当前读线程获取到了锁资源
29         if (r == 0) {
30             // 首个获取到锁资源的读线程存储在firstReader 线程对象中
31             firstReader = current;
32             // 读线程的重入次数设置为 1
33             firstReaderHoldCount = 1;
34         } else if (firstReader == current) {
35              // 当前线程为首个获取到所资源的读线程, firstReaderHoldCount + 1
36             firstReaderHoldCount++;
37         } else {
38             // 当前读线程第一个拿到锁资源的线程,先获取最后获取到锁资源的读线程对象cachedHoldCounter,持有重入次数、线程id
39             HoldCounter rh = cachedHoldCounter;
40             // rh == null : 第二个获取锁资源的读线程 或者 最后获取锁资源的读线程不是当前线程
41             if (rh == null || rh.tid != getThreadId(current))
42                 // 从ThreadLocal中 获取 HoldCounter 读线程对象
43                 cachedHoldCounter = rh = readHolds.get();
44             // 当前线程的重入次数为0,将当前线程的 HoldCounter 设置到线程变量ThreadLocal中
45             else if (rh.count == 0)
46                 readHolds.set(rh);
47             // 重入次数 + 1
48             rh.count++;
49         }
50         // 获取锁资源成功,返回1
51         return 1;
52     }
53     // 未拿到锁,未返回-1 的处理
54     return fullTryAcquireShared(current);
55 }

  ReentrantReadWriteLock#fullTryAcquireShared() 核心代码:

 1 // cas获取锁失败、重入读未被tryAcquireShared处理的,执行此方法
 2 final int fullTryAcquireShared(Thread current) {
 3     
 4     HoldCounter rh = null;
 5     for (;;) {
 6         // 获取AQS的state
 7         int c = getState();
 8         // 当前锁资源是否被其他写线程持有
 9         if (exclusiveCount(c) != 0) {
10             if (getExclusiveOwnerThread() != current)
11                 return -1;
12         // 当前读线程是否需要等待, 公平锁:等待队列中有排队的;等待队列中的head的next为写锁
13         } else if (readerShouldBlock()) {
14             // 当前线程为第一个获取读锁的线程,什么都不处理
15             if (firstReader == current) {
16                 
17             // 当前线程不为第一个获取读锁的线程
18             } else {
19                 if (rh == null) {
20                     // 获取最后一个获取锁资源的读线程
21                     rh = cachedHoldCounter;
22                     // 当前线程为第二个获取读锁的 或者 当前线程不是最后一个获取读锁的
23                     if (rh == null || rh.tid != getThreadId(current)) {
24                         // 获取去当前线程的  HoldCounter 对象,方便获取重入次数
25                         rh = readHolds.get();
26                         // 当前线程重入次数为0,不为重入操作
27                         if (rh.count == 0)
28                             //  将我的TL中的值移除掉,不移除会造成内存泄漏。用于处理JDK1.5的内存泄漏问题
29                             readHolds.remove();
30                     }
31                 }
32                 // 当前线程重入次数为0,需排队
33                 if (rh.count == 0)
34                     return -1;
35             }
36         }
37         // 超过读锁的最大值,抛出异常
38         if (sharedCount(c) == MAX_COUNT)
39             throw new Error("Maximum lock count exceeded");
40         // CAS 竞争锁资源,逻辑同tryAcquireShared
41         if (compareAndSetState(c, c + SHARED_UNIT)) {
42             if (sharedCount(c) == 0) {
43                 firstReader = current;
44                 firstReaderHoldCount = 1;
45             } else if (firstReader == current) {
46                 firstReaderHoldCount++;
47             } else {
48                 if (rh == null)
49                     rh = cachedHoldCounter;
50                 if (rh == null || rh.tid != getThreadId(current))
51                     rh = readHolds.get();
52                 else if (rh.count == 0)
53                     readHolds.set(rh);
54                 rh.count++;
55                 cachedHoldCounter = rh;
56             }
57             return 1;
58         }
59     }
60 }

  1、当前锁资源是否被其他写线程持有,获取锁资源失败,返回 -1;

  2、当前锁资源被读线程持有,判断当前读线程是都需要等待,是否超出读锁的最大值线程,若都满足,通过CAS尝试获取锁

->获取锁成功:

  是否为首个获取读锁的线程,若为是,firstReader线程对象属性设置成当前线程,firstReaderHoldCount重入次数属性设置为1;

  是否为首个获取读锁的线程重入,若为是,firstReaderHoldCount重入次数属性加1;

  第二个获取锁资源的读线程 或者 最后获取锁资源的读线程不是当前线程,创建当前线程的线程变量存储    HoldCounter对象,HoldCounter对象包含重入次数与线程id,若当前线程的重入次数为0,将当前线程的HoldCounter 设置到线程变量ThreadLocal中,重入次数 + 1

->获取锁失败

  再次尝试获取锁资源,解决内存泄露问题。

  ThreadLocalHoldCounter继承自ThreadLocal,通过ThreadLocalHoldCounter#get()方法,调用initialValue()方法,完成HoldCounter对象的初始化,进而完成thread与HoldCounter的绑定,有关ThreadLocal的分析,在ThreadLocal原理 已做分析,此处不再赘述。

 

1 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
2     // 初始化
3     public HoldCounter initialValue() {
4         // 创建HoldeCounter对象
5         return new HoldCounter();
6     }
7 }

 

2、doAcquireShared()

  获取读锁失败,加入等待队列等待,AbstractQueuedSynchronizer#doAcquireShared() 核心方法:

 

 1 private void doAcquireShared(int arg) {
 2     // 将读锁加入等待队列中等待
 3     final Node node = addWaiter(Node.SHARED);
 4     // 当如队里是否成功标识
 5     boolean failed = true;
 6     try {
 7         boolean interrupted = false;
 8         for (;;) {
 9             // 获取当前读线程节点的前一节点
10             final Node p = node.predecessor();
11             // 当前节点的前一节点为等待队列的头节点head
12             if (p == head) {
13                 // 尝试获取锁资源
14                 int r = tryAcquireShared(arg);
15                 if (r >= 0) {
16                     // 获取锁资源成功,唤醒等待队列中后面需要获取读锁的线程
17                     setHeadAndPropagate(node, r);
18                     // 为了垃圾回收,当前节点的下一节点设置为null
19                     p.next = null; 
20                     if (interrupted)
21                         selfInterrupt();
22                     failed = false;
23                     return;
24                 }
25             }
26             //  是否阻塞当前线程,需要保证当前节点前面的Node的状态为-1,才能执行后面操作
27             if (shouldParkAfterFailedAcquire(p, node) &&
28                 // 阻塞当前线程,LockSupport.park
29                 parkAndCheckInterrupt())
30                 interrupted = true;
31         }
32     } finally {
33         if (failed)
34             cancelAcquire(node);
35     }
36 }
3、总结

  读锁是共享锁,同一时间可以被多个线程持有读锁。每个获取到读锁的线程,都有自己的线程变量ThreadLocal存储锁重入的次数。首个获取读锁的线程,不需要线程变量ThreadLocal记录重入次数,而是用firstReaderHoldCount变量记录重入次数。最后一个拿到读锁的线程,用HolderCounter对象存储,可避免频繁的锁重入,从线程变量ThreadLocal中获取重入次数。

2.2.2、读锁释放锁

  释放读锁 ReentrantReadWriteLock#unlock() 核心代码:

public void unlock() {
    // 释放读锁
    sync.releaseShared(1);
}

  真正释放读锁的方法,AbstractQueuedSynchronizer#releaseShared() 核心代码:

 1 public final boolean releaseShared(int arg) {
 2     // 尝试释放读锁
 3     if (tryReleaseShared(arg)) {
 4         // 唤醒等待队列中的等待线程
 5         doReleaseShared();
 6         // 无读线程持有读锁标识,返回true
 7         return true;
 8     }
 9     // 无读线程持有读锁标识,返回false
10     return false;
11 }
1、tryReleaseShared()

  尝试释放读锁,ReentrantReadWriteLock#tryReleaseShared() 核心代码:

 1 protected final boolean tryReleaseShared(int unused) {
 2     // 获取当前线程
 3     Thread current = Thread.currentThread();
 4     // 当前线程为首个获取读锁的线程
 5     if (firstReader == current) {
 6         // 若首个获取读锁的线程重入次数为1
 7         if (firstReaderHoldCount == 1)
 8             // 将首个获取锁资源的读线程对象设置为null,方便GC
 9             firstReader = null;
10         else
11             // 首个获取读锁线程的重入次数 -1 
12             firstReaderHoldCount--;
13     } else {
14         // 获取最后一个获取读锁线程的重入次数、线程id
15         HoldCounter rh = cachedHoldCounter;
16         //  当前线程不是最后一个线程
17         if (rh == null || rh.tid != getThreadId(current))
18             // 获取当前线程的线程变量中的HoldCounter
19             rh = readHolds.get();
20         // 获取当前线程的可重入次数
21         int count = rh.count;
22         // 当前线程的可重入次数小于等于 1
23         if (count <= 1) {
24             // 删除当前线程的线程变量,避免内存泄露
25             readHolds.remove();
26             // 当前线程的可重入次数小于等于 0,抛出异常
27             if (count <= 0)
28                 throw unmatchedUnlockException();
29         }
30         // 当前线程的可重入次数 -1
31         --rh.count;
32     }
33     for (;;) {
34         // 获取 AQS 的 state
35         int c = getState();
36         // 获取读锁的状态值
37         int nextc = c - SHARED_UNIT;
38         // CAS 释放读锁
39         if (compareAndSetState(c, nextc))
40             // 所有的读线程是否都释放读锁
41             return nextc == 0;
42     }
43 }
2、doReleaseShared()
 1 // 唤醒等待队列中的
 2 private void doReleaseShared() {
 3     // 循环处理,防止有新的线程加入等待队列中
 4     for (;;) {
 5         // 获取等待队列头节点
 6         Node h = head;
 7         // 头节点不为空,等待队列中有多个线程在等待
 8         if (h != null && h != tail) {
 9             // 获取头节点的等待状态
10             int ws = h.waitStatus;
11             // 当前Node节点状态为被阻塞
12             if (ws == Node.SIGNAL) {
13                 // CAS 操作,WaitStatus 由 -1 设置成 0,并唤醒阻塞的线程锁
14                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
15                     //  CAS 失败,循环更新
16                     continue;       
17                 // 唤醒等待队列中挂起的线程
18                 unparkSuccessor(h);
19             }
20             // 当前节点为已唤醒线程,WaitStatus 由 0 设置成 -3 
21             // 等待的读线程的处理
22             else if (ws == 0 &&
23                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
24                 //  CAS 失败,循环更新
25                 continue;
26         }
27         // 没有新的线程加入等待队列,结束循环
28         if (h == head)
29             break;
30     }
31 }
3、总结

  读锁的释放,根据AQS中state的高16位值判断,若为0,表示无读线程持有读锁,此时需要唤醒等待队列中的阻塞线程抢锁资源;若大于0,

-> 当前线程是首个获取读锁的线程

  首个获取读锁的线程重入次数为1,则将firstReader设置为null,首个获取读锁线程彻底释放锁资源,若首个获取读锁的线程重入次数大于1,firstReaderHoldCount减 1

-> 当前线程不是最后一个获取读锁的线程

  从当前线程的线程变量ThreadLocal中,获取HoldCounter,若可重入次数等于1,删除当前线程的线程变量,避免内存泄露,同时可重入次数减 1;若可重入次数小于等于0,抛出异常。

-> 当前线程不是最后一个获取读锁的线程

  cachedHoldCounter属性作为HoldCounter,若可重入次数等于1,删除当前线程的线程变量,避免内存泄露,同时可重入次数减 1;若可重入次数小于等于0,抛出异常。