Redisson 实现分布式锁

发布时间 2023-10-18 16:02:01作者: MyDistance

Redisson 实现分布式锁

分布式锁的应用场景有哪些?实现的方式有哪几种?Redisson 又是怎么实现的?

1、应用场景、特点及实现方式

1.1、分布式锁的应用场景

主要有以下两类:

  1. 提升处理效率:避免重复任务的执行,减少系统资源的浪费(例如幂等场景)。
  2. 保障数据一致性:在多个微服务并发访问时,避免出现访问数据不一致的情况,造成数据丢失更新等情况。

以下是不同客户端并发访问时的场景:

1

1.2、分布式锁的特点

分布式锁主要有以下几个特点:

  1. 独占性:同一时刻只有一个线程能够持有锁。
  2. **可重入:同一个线程能够重复获取已获得的锁。
  3. 超时:在获得锁之后限制锁的有效时间,避免资源无法释放而造成死锁。
  4. 高可用:有良好的获取锁与释放锁的功能,避免分布式锁失效。

1.3、分布式锁的实现方式

目前主流的实现方式有以下几种:

  • 基于数据库(例如基于 CAS 的乐观锁)。
  • 基于 Redis。
  • 基于 zookeeper(不只具有服务注册与发现的功能)。
  • 基于 etcd。

本篇讲解是基于 Redis 的方式去实现分布式锁,具体实现用到的是 Redisson。

2、Redisson 入门

概念:Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格。通俗来将,就是在 Redis 基础上实现的分布式工具集合。点击访问项目地址

这里以 SpringBoot 项目怎么使用 Redisson 实现分布式锁为例。

首先要做的是引入相关依赖。

2.1、引入依赖

<!--redisson-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

依赖引入后下一步就是老生常谈的配置环境了。

2.2、添加配置

redisson 支持单点、主从、哨兵、集群等部署方式:

/**
 * redisson 配置
 */
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        //单点
        Config config = new Config();
        //地址及密码
        config.useSingleServer()
            .setAddress("redis://127.0.0.1:6379").setPassword("123456");
        return Redisson.create(config);

        //主从
//        Config config = new Config();
//        config.useMasterSlaveServers()
//            .setMasterAddress("redis://127.0.0.1:6379").setPassword("123456")
//            .addSlaveAddress("redis://127.0.0.1:6389")
//            .addSlaveAddress("redis://127.0.0.1:6399");
//        return Redisson.create(config);

        //哨兵
//        Config config = new Config();
//        config.useSentinelServers()
//            .setMasterName("myMaster")
//            .addSentinelAddress("redis://127.0.0.1:6379", "redis://127.0.0.1:6389")
//            .addSentinelAddress("redis://127.0.0.1:6399");
//        return Redisson.create(config);

        //集群
//        Config config = new Config();
//        config.useClusterServers()
//                //cluster state scan interval in milliseconds
//            .setScanInterval(2000)
//            .addNodeAddress("redis://127.0.0.1:6379", "redis://127.0.0.1:6389")
//            .addNodeAddress("redis://127.0.0.1:6399");
//        return Redisson.create(config);
    }
}

配置完成之后,下一步就是编写类进行测试。

2.3、编写接口

@Autowired
private RedissonClient redissonClient;

@RequestMapping("/test")
public  void test() throws InterruptedException {
    //获取锁
    RLock lock = redissonClient.getLock("lock");
    //加锁,参数:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
    //注意:如果指定锁自动释放时间,不管业务有没有执行完,锁都不会自动延期,即没有 watch dog 机制。
    boolean isLock = lock.tryLock(1, 2, TimeUnit.SECONDS);
    try {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        if (isLock) {
            System.out.println(format.format(System.currentTimeMillis()) + "获取分布式锁成功");
            Thread.sleep(1000);
            System.out.println(format.format(System.currentTimeMillis()) + "业务完成");
        } else {
            System.out.println(format.format(System.currentTimeMillis()) + "获取分布式锁失败");
        }
    } catch (Exception e) {
        throw new RuntimeException("业务异常");
    } finally {
        //当前线程未解锁
        if (lock.isHeldByCurrentThread() && lock.isLocked()) {
            //释放锁
            System.out.println("解锁");
            lock.unlock();
        }
    }
}

分布式锁的使用分成以下 3 步:

  1. 获取锁:根据唯一的 key 去 redis 获取锁。

  2. 加锁:拿到锁后在指定的等待时间内不断尝试对其加锁,超过等待时间则加锁失败。

  3. 解锁:分成两种情形:

    • 第一如果在加锁的时候指定了自动释放时间,那么在此时间范围内业务提前完成的话就在 finally 手动释放锁,而如果业务没有完成也会自动释放锁,所以指定自动释放时间需要做非常仔细的考量;
    • 第二就是没有指定自动释放时间,由于 redisson 有 watch dog (看门狗)机制,watch dog 默认的 releaseTime 是 30s,给锁加上 30s 的自动释放时间,并且每隔 releaseTime / 3 即 10 s 去检查业务是否完成,如果没有完成重置 releaseTime 为 30 s, 即锁的续约,所以一个业务严重阻塞的话会造成系统资源的极大浪费。到这里你应该能够明白分布式锁是没有完美的解决方案的。

    纸上得来终觉浅,下面我们开始测试接口。

2.4、测试

要模拟多个线程同时获取分布式锁,这里我用到了 jmeter

3 个线程同时访问,控制台打印结果如下:

//第一个线程加锁成功
2023-09-17 15:33:19获取分布式锁成功   
2023-09-17 15:33:20业务完成
//第一个线程释放锁     
解锁
//第二个线程加锁成功    
2023-09-17 15:33:20获取分布式锁成功
//第三个线程加锁失败,第二个线程已占有锁且已过等待时间 20 - 19 = 1    
2023-09-17 15:33:20获取分布式锁失败 
2023-09-17 15:33:21业务完成
//第二个线程释放锁    
解锁

对打印结果有疑问?

首先第 1 个线程在 19 - 20 秒的时间范围内加锁,2、3 线程处于阻塞状态,

在 20 秒 1 线程释放锁后 2 线程刚好在等待时间的临界点加锁成功,3 线程就没那么好运了,在临界点抢不过 2 线程,加锁失败。

21 秒 2 线程完成业务释放锁。

根据以上业务分析 Redisson 的分布式锁有哪些特点

  1. 独占性:1 线程加锁成功后是 2、3 线程处于阻塞状态无法加锁。
  2. 超时:指定 2 秒的自动释放时间,由于 key 存放在 redis,即使服务宕机,redis 也会自动删除 key 。
  3. 高可用:1 线程和 2 线程加锁成功后能够良好的解锁(这里配置了单点,真正的高可用一般需要哨兵或集群)。

那么可重入呢?难道 Redisson 没有该特性?

不急,继续往下看。

3、Redisson 可重入

现在我们不了解Redisson 是否能够可重入,即同一个线程能否多次获得同一个锁?

既然不了解,那么直接上测试。

3.1、编写接口

/**
     * 重入方法1
     *
     * @throws InterruptedException
     */
@RequestMapping("/reentrant")
public void reentrant1() throws InterruptedException {
    //获取锁
    RLock lock = redissonClient.getLock("reentrant");
    //加锁,参数:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
    boolean isLock = lock.tryLock(10, 25, TimeUnit.SECONDS);
    try {
        if (isLock) {
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            System.out.println(format.format(System.currentTimeMillis()) + "获取分布式锁1成功");
            Thread.sleep(15000);
            //调用方法2
            reentrant2();
            System.out.println(format.format(System.currentTimeMillis()) + "业务1完成");
        }
    } catch (Exception e) {
        throw new RuntimeException("业务异常");
    } finally {
        //当前线程未解锁
        if (lock.isHeldByCurrentThread() && lock.isLocked()) {
            //释放锁
            System.out.println("分布式锁1解锁");
            lock.unlock();
        }
    }
}

/**
     * 重入方法2
     *
     * @throws InterruptedException
     */
public void reentrant2() throws InterruptedException {
    //获取锁
    RLock lock = redissonClient.getLock("reentrant");
    //加锁,参数:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
    boolean isLock = lock.tryLock(5, 25, TimeUnit.SECONDS);
    try {
        if (isLock) {
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            System.out.println(format.format(System.currentTimeMillis()) + "获取分布式锁2成功");
            Thread.sleep(10000);
            System.out.println(format.format(System.currentTimeMillis()) + "业务2完成");
        }
    } catch (Exception e) {
        throw new RuntimeException("业务异常");
    } finally {
        //当前线程未解锁
        if (lock.isHeldByCurrentThread() && lock.isLocked()) {
            //释放锁
            System.out.println("分布式锁2解锁");
            lock.unlock();
        }
    }
}

这里在方法 1 中调用方法 2,并且都尝试获取同一把锁。

3.2、验证

使用 postman 测试接口,控制台打印结果如下:

//方法1加锁
2023-09-17 17:16:01获取分布式锁1成功
//方法2获取同一把锁并加锁    
2023-09-17 17:16:16获取分布式锁2成功
2023-09-17 17:16:26业务2完成
//方法2释放锁    
分布式锁2解锁 
2023-09-17 17:16:26业务1完成
//方法1释放锁       
分布式锁1解锁

根据上面的打印结果,能够推测出 Redisson 是拥有可重入的特性的!!!

原因很简单,在方法 1 持有锁的同时,方法 2 能够再次加锁,而如果不可重入,则方法 2 肯定无法对其加锁。

另外也可以直接查看 redis 锁的具体情况:

方法 1 加锁时, value 为 1

Snipaste_2023-09-17_17-26-44

方法 2 再次加锁,value 为 2

Snipaste_2023-09-17_17-28-03

这进一步验证了上面的猜测,当方法 1 加锁时 value 为 1,方法 2 再次加锁实现了 value + 1。

释放锁的过程则相反,方法 2 释放锁时 value - 1, 方法 1 再次释放锁 value = 0,直接删除锁。

你说了那么多,我还是有点懵,你能不能画个流程出来? 我。。。。竟无语凝噎。

3.3、具体流程

Redisson 实现可重入采用 hash 的结构,在 key 的位置记录锁的名称,field 的位置记录线程标识, value 的位置则记录锁的重入次数。

加锁时,如果线程标识是自己,则锁的重入次数加 1,并重置锁的有效期。

释放锁时,重入次数减 1,并判断是否为 0,如果为 0 直接删除,否则重置锁的有效期。

具体的流程如下:

2

这样子你能明白什么是可重入了吗?明白倒是明白了,但是你这只有流程,空口无凭,有没有具体的代码实现?

???好好好,算我倒霉,第一次遇到犟驴了。。。

3.4、源码

这里我以 tryLock()方法为例。

直接点到底层运用的tryLockInnerAsync()方法, 能够看到用的是lua脚本进行加锁实现计数 + 1。

加锁源码(这里是最新的源码,不是上面依赖的 3.13.6)如下:

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
                        //判断锁是否存在
                "if ((redis.call('exists', KEYS[1]) == 0) " +
                            //或者锁已经存在,判断threadId是否是自己
                            "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                        //锁次数加 1
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        //设置有效期
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        //返回结果
                        "return nil; " +
                    "end; " +
                    //没获取到锁,返回锁的剩余等待时间
                    "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

lua脚本能够保证操作的原子性,这里判断锁是否存在或者是当前线程,锁的次数加 1 并重置有效期。

反之无法加锁则返回锁的剩余等待时间。

说完了加锁,接下来说解锁,以unlock()方法为例。

解锁源码如下:

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    //判断锁是否自己持有
              "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        //不持有,直接返回
                        "return nil;" +
                    "end; " +
                    //是自己的锁,重入次数 - 1
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    //可重入次数为否为 0
                    "if (counter > 0) then " +
                        //大于0,不能释放锁,重置有效期
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        //等于0,删除锁
                        "redis.call('del', KEYS[1]); " +
                        "redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return nil;",
                Arrays.asList(getRawName(), getChannelName()),
                LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand());
    }

同样使用到了 lua 脚本,如果是自己的线程,重入次数 - 1,当可重入次数为 0 删除锁,否则重置有效期。

这下子总算是明白了。那这个锁尝试加锁是实现重试的?

4、Redisson 重试

tryLock()方法第一个参数waitTime是尝试加锁的最大等待时间,在这个时间内会不断地进行重试。

上面说到tryLockInnerAsync()方法用于执行加锁并计数,当加锁失败返回锁的剩余等待时间。

往回查看,最终返回的是RFuture<Long>的对象。

image-20230917203846394

往回看,继续返回锁的剩余等待时间。

image-20230917204138355

继续往回看,我们可知道加锁成功返回的是 null,而加锁失败返回的是锁的剩余等待时间。

当加锁的消耗时间大于锁的等待时间,返回false。

image-20230917205025748

如果等待时间还有剩余,那就继续重试,但是这里的重试不是立即重试,我们可以看到有一个 subscribe 方法,这个是订阅释放锁的消息(如果有线程释放了锁就会发一个消息过来)(释放锁时的 publish 命令就是发布消息通知,subscribe 订阅的就是它发布的通知)。

消息通知也是不确定的,所以这里返回的仍旧是一个 Future,然后通过 get 等待结果,future 在指定时间内完成会返回 true。

image-20230917212820411

如果在剩余等待时间内收到订阅通知,那么会继续计算剩余等待时间(排除掉订阅等待的时间),如果此时无剩余时间返回 false;如果剩余等待时间依然有剩余,就可以再次尝试获取锁。如果加锁成功返回 true,否则不断计算剩余等待时间。

image-20230917213312801

Redisson的重试流程大致讲解完毕。

前面好像还提到了 watch dog(看门狗),上面的加锁与解锁还有重试流程都没有体现呀!!!

好吧,继续往下看。

5、Redisson watch dog

watch dog 是指当你加锁时没有指定锁的自动释放时间时,则默认给你添加一个 30s 的自动释放时间,并且每隔 30s / 3 即 10s 去进行锁的续约,即每 10s 锁的自动释放时间重置为 30s, 直至业务完成。

5.1、源码

我们继续来分析源码,如果我们没有指定超时释放时间默认是一个看门狗时间(30s)。

image-20230917215238332

future 完成后,如果剩余有效期等于 null(获取锁成功),会调用 scheduleExpirationRenewal(自动更新续期) 方法。

image-20230917215420854

跟踪进来,首先会 new 一个 ExpirationEntry,然后把它扔到了 map 里面,这个 map 的 key 是 string 类型(id + 锁名称),值是 entry,且为 static final,即 RedissonLock 类的所有实例都可以看到这个 map,一个 Lock 类会创建出很多锁的实例,每一个锁的实例都有自己的名字(entryName),在 map 中有唯一的 key 和 唯一的 entry。第一次调用时,entry 不存在,所以使用 putIfAbsent;多次调用时,entry 是存在的,putIfAbsent 就会失效,返回旧的 entry,因此就能够保证不管锁被重入几次,拿到的永远是同一个 entry。所以,这里的 map 的作用就是保证同一个锁拿到的永远是同一个 entry

image-20230917220607662

然后将线程ID放入 entry,第一次调用时还会执行 renewExpiration (更新有效期)方法,我们可以跟踪到这个方法里面看一看。

image-20230917220756413

方法进来后,先从 map 中得到 entry,然后会执行一个定时任务(Timeout),这个任务有两个参数:一个是任务本身 task,另一个参数是延时 delay,即此任务是在 delay 时间到期后执行。我们可以看到这个延时任务会在内部锁施放时间(默认看门狗时间)的 1/3 后执行。

image-20230917221222609

那么我们可以跟踪到 renewExpirationAsync 方法看一下执行的任务是什么,显然这是一个更新有效期的操作。

image-20230917221257928

我们再返回去,可以看到 renewExpirationAsync 方法执行完后又会递归调用自身,这样一来,锁的有效期就会不断进行重置,永不过期(初始默认为30s,10s后又设置为30s ....)。

image-20230917221350997

最后把任务放到 entry 中,因此 entry 中存放了两个数据:一个是当前线程ID,一个是定时任务。从这里我们就能看出为啥前面第一次调用时会执行 renewExpiration,而后面就不会调用此方法,因为 oldEntry 中已经有了定时任务,只需要把线程 ID 加进去即可。

image-20230917221812080

由上面分析可以看出锁的有效期被无限延续,那什么时候释放锁呢?自然是在 unlock 的时候。我们继续跟踪 unlock 方法,在释放锁的时候会执行 cancelExpirationRenewal(取消更新任务) 方法。

image-20230917222405473

我们跟踪到 cancelExpirationRenewal 方法中,根据锁的名称从 map 中取出当前锁的 entry,将 ID 移除掉,然后再取出 task,将任务取消掉,最后再把 entry 移除掉,锁的释放就彻底完成了。

image-20230917222517828

watch dog 的源码也分析完毕(与3.13.6源码有些许差别)。

5.2、重试与 watchdog 流程

看完源码,自己尝试把第 4 点重试的流程与第 5 点 watch dog 的流程梳理一遍。

简化流程如下:

3

如果你有自己走了一遍源码,那相信你绝对能够画出大致的流程。如果有不懂的地方,那么源码是你最好的老师。

Redisson 的基本介绍与使用到这里告一段落了,分布式锁没有完美的解决方案,根据业务复杂度灵活配置等待时间、释放时间才是根本。

参考资料: