ReentrantLock

发布时间 2023-06-26 17:02:48作者: LycCj

ReentrantLock底层的源码分析:

本小节我们将由浅入深的讲解ReentrantLock的底层源码,其中会附带有源码的分析:

1.自己实现简易的ReentrantLock锁:

在多线程的并发的操作当中,我们需要通过锁机制来实现多个线程互斥的访问特定的资源从而避免并发下的操作问题。我们可以先来看一下ReentrantLock最基础的用法:

案例:给出用户银行取钱的案例:

package JUC.ReentrantLockTest;

import java.util.concurrent.locks.ReentrantLock;

import static java.lang.Thread.sleep;

public class Test {

    private static ReentrantLock lock = new ReentrantLock();
    //private static SelfReentrantLock lock = new SelfReentrantLock();

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                drawMoney();
                lock.unlock();
            }
        },"线程1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                drawMoney();
                lock.unlock();
            }
        },"线程2").start();
    }

    private static void drawMoney(){
        System.out.println(Thread.currentThread().getName()+"正在取钱");
        sleep(3000);
        System.out.println(Thread.currentThread().getName()+"取完钱了");
    }

    private static void sleep(long mills){
        try {
            Thread.sleep(mills);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这个例子会造成的结果是:

线程1正在取钱
线程1取完钱了
线程2正在取钱
线程2取完钱了
Process finished with exit code 0

很容易看见这个结果是我们想要,使用了ReentrantLock实现了多个线程互斥的访问资源。

我们也可以利用CAS自旋锁来实现一个简易的ReentrantLock:

package JUC.ReentrantLockTest;

import sun.misc.Unsafe;

import java.lang.reflect.Field;

//自己手写的ReentrateLock锁:
public class SelfReentrantLock {

    private volatile int state;
    private volatile static long stateOffset;
    private static Unsafe unsafe;
    static {
        try {
            unsafe = getUnsafe();
            stateOffset = unsafe.objectFieldOffset(
                    SelfReentrantLock.class.getDeclaredField("state"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static Unsafe getUnsafe() {
        try {
            Field unsafe = Unsafe.class.getDeclaredField("theUnsafe");
            unsafe.setAccessible(true);
            return (Unsafe) unsafe.get(null);
        } catch (Exception e) {
            return null;
        }

    }

    public void lock() {
        // 如果修改失败 就进入阻塞状态
        // 如果成功 就会将state修改成1
        while (!unsafe.compareAndSwapInt(this, stateOffset, 0, 1)) {
        }
    }

    public void unlock() {
        state = 0;
    }
}

在上面的代码中,我们通过volatile 的state变量作为共享变量,当state等于0时,当前的锁没有被线程持有,因此先到的线程可以获取响应的锁。如果一个线程持有了锁,后面的线程只能不断尝试while循环中的CAS操作,无法执行lock语句之后的操作,通过while来阻塞自身的代码向后执行。所以,我们基本实现了一个自旋锁。解锁代码是一个很简单的代码就是state=0,因为state我们设置成的是volatile类型,我们知道volatile是保证并发中的可见性的,因此,当在改变值后,有线程进行CAS判断就会在while不成立,从而可以进一步拿到锁接着执行相应的逻辑。

但是存在的问题:

1.在获取不到资源的线程会一直while,导致CPU空转轮询
2.如果存在多个线程,无法保证线程的公平性。(即公平锁)

2.深入查看ReentrantLock的源码:

1.首先会看到ReentrantLock会有公平锁与非公平锁两种实现方式:

image-20230626110327677

2.1 先看ReentrantLock的公平锁:

image-20230626110655220

进入lock加锁方法:

image-20230626110801164

我们需要重点来看if判断条件中的三个方法,这三个方法其中一定会有我们一开始自己实现简易的ReentrantLock的逻辑.

tryAcquire(arg):

尝试去加锁:

(1)如果尝试去加锁没有成功就会返回false继续执行与判断的后续判断条件。

(2)如果尝试加锁是成功的就会直接返回了,因为if条件第一个条件就不成立了。

image-20230626111307494

1.在tryAcquire公平锁的实现过程当中,会首先获取state,如果当前锁没有被占用也就是state=0,就会检查队列是否有在排队的线程,之所以要检查,是因为为了满足公平锁排队的需要。
也就是会执行hasQueuedPredecessors方法
2.如果当前的线程之前没有排队的线程,而且CAS修改state成功,就会设置锁当前持有的线程为自己,也就是会执行setExclusiveOwnerThread方法
3.如果c!=0,说明当前锁被其他线程所占有,入股当前获取锁的线程是自身,则可以重入,也就是重入锁,此时会增加state数,代表当前线程重入的次数。

注意hasQueuedPredecessors其中涉及到的AQS:

image-20230626131904546

注意AQS的数据结构中比较重要的几个属性:

//节点元素node的构成:
volatile Node prev; //指向前一个结点的指针
volatile Node next; //指向后一个结点的指针
volatile Thread thread; //当前结点代表的线程
volatile int waitStatus; //等待状态

//AQS其他的数据结构
private volatile int state;//Reentratlock中表示锁被同一个线程重入的次数
private transient Thread exclusiveOwnerThread;//标识锁是由哪个线程拥有

image-20230626132037184

接着会执行acquire的方法,如果当前锁被持有,同时也不是能重入的情况,那么tryAcquire会返回false结果,说明当前线程需要被阻塞,也就是会进入acquireQueued方法中的addWaiter方法:

image-20230626122954568

1.首先创建该节点所对应的Node对象:
Node node = new Node(Thread.currentThread(), mode); 
2.然后判断尾节点是否为空:
  (1)如果不为空,就将当前新生成的节点.prev = tail:
       node.prev = pred; 
  (2)而此时插入的逻辑中需要考虑并发的情形,如果此时两个线程都
       生成了node节点,需要使用CAS来判断,最后的tail到底指向那个node节点:
       if (compareAndSetTail(pred, node)) {
          .......
        } 
       1) 假设成功的节点插入最后:
            pred.next = node;
            return node;
       2)插入失败的节点会执行enq方法
  (3)进入enq方法:确保无论有多少个没有在compareAndSetTail成功插入节点在这个方法过后
       都会加入AQS队列当中。

image-20230626133821525

enq方法其实很简单,就是无限循环加上CAS去不断插入到成为尾节点。其中会首先保证检查当前的尾结点是否为空,如果为空就会同时创建头结点,注意此时的head节点与tail指向的是一个空的结点,这里的空指的是Node中的Thread的没有赋值,然后进入for循环的第二轮。我们来画个图,解释为什么要生成Thread=null的头节点,其实就是为了统一代码的编写:

image-20230626135425493

我们到后面会发现将线程的阻塞的方法是使用head.next来unpark下一个节点,也就是说如果在Thread1后有Thread2,就会使用head.next.next唤醒Thread2,那么统一逻辑Thread1由谁来唤醒,那么就是这个null节点。

接下来就是acquireQueued方法:
image-20230626140243484

acquireQueued方法主要是进行判断当前的线程是否需要阻塞的逻辑。如果当前线程的前一个节点(node.predecessor)是头结点,而且获取锁成功(tryAcquire),则当前的线程不需要被阻塞,并且调整当前节点为头结点;为什么要有这个逻辑?因为如果当前线程在tryAcquire中获取不到锁,则就会进行加入AQS队列,这个加入队列时候需要一定时间,假设在加入队列队列时间内锁被释放了,然后当前线程又是加入AQS的第一个节点,则需要在acquireQueued首先被唤醒。注意此时还是会调用tryAcquire方法尝试获取锁,因为并不是第一个节点就一定会获取到锁,所以需要这个判断。

image-20230626140949627

就是将head指向当前的node,然后将当前的Thread置为null空。

否则当前的线程需要在队列中阻塞等待(调用park相关的方法,并且修改waitStatus)

image-20230626141229164

修改waitStatus和park的操作是在shouldParkAfterFailedAcquire中进行的。waitStatus是一个非常重要的属性,在构建Node的时候默认初始化的值为0。当后面有一个线程排队在它后面时,后面的节点会通过prev属性找到前面的节点,并且修改waitStatus为-1(SIGNAL)。这意味着当一个线程释放锁的时候,检查自己的waitStatus,如果为SIGNAL,就需要unpark后面的线程。 parkAndCheckInterrupt就是将当前线程park.

image-20230626141440241

这就是整个加锁的逻辑。

2.2 ReentrantLock的非公平锁:

对于非公平锁而言,假设有多个线程并发访问, 他们都会立刻访问并且尝试修改State,如果state修改失败,就会调用acquire

image-20230626144158953

调用tryAcquire方法时候会调用非公平锁的实现方法:

公平锁就会查看是否需要去排队,而非公平锁就没有判断了。

image-20230626144531289

2.3 解锁:

对于公平锁与非公平锁解锁逻辑是相同的:

image-20230626150302098

image-20230626150249353

image-20230626151141783

解锁时候还记得我们的重入锁吗?这里使用的是如下代码来解锁:

int c = getStatee() - releases

对应我们实际使用时候是这样的:有几次加锁就要解锁几次

lock.lock()  //state++
lock.lock()  //state++
lock.lock()  //state++
.....
lock.unlock()  //state--
lock.unlock()  //state--
lock.unlock()  //state--

注意对于重入锁在当state还未减到0时一直返回的是值为false的free也就是说该锁还未空闲,当state=0才会改free等于true.

按照我们之前的逻辑需要干以下两件事:

1.将state改成为0,在tryrelease方法中做了.
2.然后根据waitSatus是否等于-1,需要unpark来解锁后续在AQS中的线程

image-20230626151757965

首先使用CAS将waitStatus改成0,然后拿到head.next.注意这里是head.next也就是我们前面提到为什么head头结点要赋值一个Thread等于null的结点.

(1)如果s不等于空而且waitStatus==-1就unpark:

LockSupport.unpark(s.thread);

(2)另外一个判断:

image-20230626152429868

为什么会有这个判断,是因为我们之前看到有中断的线程,如果一个线程中断,但是还加入到了AQS对立当中,那么他的waitStatus就会等于1.对于unpark方法不一定就是释放head.next结点,需要判断waitStatus属性值,注意上面的两个if判断代表的意思是第一个if当不等于null时,会去判断waitStatus.此时可能head.next的waitStatus大于0就是线程中断了,因此不需要unpark,此时会从AQS队列的末尾来判断哪一个waitStatus小于-1,然后对其释放.

完结,以上就是ReentrantLock底层的实现原理.