Redis—分布式锁

发布时间 2023-09-03 20:46:37作者: Stitches

单实例的正确实现方式

获取锁的正确操作为: SET resource_name my_random_value NX PX 30000,它限定了只有当锁空闲且持有锁的时间为30000ms,并且锁资源对应的 value 为一个随机值。设置随机值是为了在释放锁时,确保当前线程能够释放该锁,避免出现操作超时的线程释放了其它线程的锁。

释放锁时查询对比 value 和具体删除锁操作必须是一个原子操作,可以结合 LUA脚本实现,对应代码如下:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

设置的 value 可以是任意随机值,最简单的就是 UNIX timestamp with microsecond precision

代码实例

参考:https://juejin.cn/post/7020803797211414558

// 锁接口,提供释放锁、尝试加锁、重复尝试加锁
public interface RedisLock {
    long TIMEOUT_MILLIS = 30000;

    int RETRY_MILLIS = 30000;

    long SLEEP_MILLIS = 10;

    boolean tryLock(String key);

    boolean lock(String key);

    boolean lock(String key, long expire);

    boolean lock(String key, long expire, long retryTimes);

    boolean unlock(String key);
}

// 简单抽象类
public abstract class AbstractRedisLock implements RedisLock{
    @Override
    public boolean lock(String key) {
        return lock(key, TIMEOUT_MILLIS);
    }

    @Override
    public boolean lock(String key, long expire) {
        return lock(key, TIMEOUT_MILLIS, RETRY_MILLIS);
    }
}

// 具体实现类
@Component
public class RedisLockImpl extends AbstractRedisLock {
    private Logger logger = LoggerFactory.getLogger(RedisLockImpl.class);

    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    private static final String UNLOCK_LUA;     // Lua脚本保证释放锁操作原子性

    static {
        StringBuilder builder = new StringBuilder();
        builder.append("if redis.call(\"get\",KEYS[1]) == ARGV[1]");
        builder.append("then ");
        builder.append("    return redis.call(\"del\",KEYS[1]) ");
        builder.append("else ");
        builder.append("    return 0 ");
        builder.append("end ");
        UNLOCK_LUA = builder.toString();
    }

    @Override
    public boolean tryLock(String key) {
        return tryLock(key, TIMEOUT_MILLIS);
    }

    // unblocking lock
    public boolean tryLock(String key, long expire) {
        try {
            return !StringUtils.isEmpty(redisTemplate.execute((RedisCallback<String>) connection-> {
                JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                String uuid = UUID.randomUUID().toString();
                threadLocal.set(uuid);
                return commands.set(key, uuid, SetParams.setParams().nx().px(expire));
            }));
        } catch (Exception e) {
            logger.error("set redis occured an exception", e);
        }
        return false;
    }

    // blocking lock
    @Override
    public boolean lock(String key, long expire, long retryTimes) {
        boolean result = tryLock(key, expire);
        while (!result && retryTimes-- > 0) {
            try {
                logger.debug("lock failed, retrying {}......", retryTimes);
                Thread.sleep(SLEEP_MILLIS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            result = tryLock(key, expire);
        }
        return result;
    }

    @Override
    public boolean unlock(String key) {
        try {
            List<String> keys = Collections.singletonList(key);
            List<String> args = Collections.singletonList(threadLocal.get());
            Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
                Object nativeConnection = connection.getNativeConnection();
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                }
                if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                }
                return 0L;
            });
            return result != null && result > 0;
        } catch (Exception e) {
            logger.error("unlock occurred an exception", e);
        }
        return false;
    }
}

项目依赖的资源文件 pom.xml,注意 redis.version 需要和 spring-data-redis 中的版本保持一致:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.15</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.demo</groupId>
    <artifactId>distributed_lock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>distributed_lock</name>
    <description>distributed_lock</description>
    <properties>
        <java.version>1.8</java.version>
        <redis.version>3.8.0</redis.version>
        <spring-test.version>5.0.7</spring-test.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>${redis.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>log4j-over-slf4j</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

测试:

@SpringBootTest
@RunWith(SpringRunner.class)
class DistributedLockApplicationTests {
    private Logger logger = LoggerFactory.getLogger(DistributedLockApplicationTests.class);

    @Autowired
    private RedisLock redisLock;
    @Autowired
    private StringRedisTemplate redisTemplate;

    private ExecutorService executors = Executors.newScheduledThreadPool(8);

    @Test
    void contextLoads() {
    }

    @Test
    public void lock() {
        redisTemplate.opsForValue().set("goods-seckill", "10");
        List<Future> futureList = new ArrayList<>();

        // 100 threads
        for (int i = 0; i < 100; i++) {
            futureList.add(executors.submit(this::seckill));
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // waiting for result blocked
        futureList.forEach(action -> {
            try {
                action.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    public int seckill() {
        String key = "goods";
        try {
            redisLock.lock(key);
            int num = Integer.valueOf(Objects.requireNonNull(redisTemplate.opsForValue().get("goods-seckill")));
            if (num > 0) {
                redisTemplate.opsForValue().set("goods-seckill", String.valueOf(String.valueOf(--num)));
                logger.info("秒杀成功,剩余库存{}", num);
            } else {
                logger.error("秒杀失败,剩余库存{}", num);
            }
            return num;
        } catch (NumberFormatException e) {
            logger.error("seckill exception", e);
        } finally {
            redisLock.unlock(key);
        }
        return 0;
    }
}

单实例实现起来方便,但是不具备实际可用性,实际使用往往是 redis 集群,所以考虑 Redlock 算法。


官方文档解释

https://redis.io/docs/manual/patterns/distributed-locks/

为什么基于故障转移的实现是不够的?

实现主从的 Redis架构,但是仍不能保证在主从复制过程中锁资源的互斥性,存在以下情况:

  • Client A acquires the lock in the master.
  • The master crashes before the write to the key is transmitted to the replica.
  • The replica gets promoted to master.
  • Client B acquires the lock to the same resource A already holds a lock for. SAFETY VIOLATION!

Redlock 算法

在分布式版本中,我们假设我们有N个redis 的matser,各自独立,没有任何关系(不在一个cluster下),可以部署在不同的服务器或是虚拟机上,假设N设置成5。

某个客户端获取锁的操作:

  1. 客户端 A 获取当前时间戳;
  2. 客户端 A 尝试在所有的 N 个 master 中有序地获取锁,使用相同的 key、value;获取锁的过程由于需要遍历多个 redis 服务,可能导致阻塞,需要设置超时时间,假设锁自动释放的时间是 10s,那么这个超时时间可以设置在 5~50ms 的范围内,防止客户端在获取锁期间由于 redis 节点的崩溃导致获取锁的操作超时,如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁;
  3. 客户端 A 获取到锁,再获取当前的时间戳,和步骤1 的时间相减,得到获取锁消耗的时间。在锁有效期内,当且仅当客户端A拿到大部分( N/2 + 1,这里至少3)的锁时,分布式锁才可以被正式获取;
  4. 每次当锁被获取到时(从每个master获取),有效时间可以被设置成初始有效时间减去获取锁消耗的时间;
  5. 假设步骤2中获取锁失败,没有拿到大部分锁,那么它需要把自己在少部分 redis 上拿到的锁释放掉。

失败重试

当客户端获取不到锁,它应该在之后一个随机时间点重试,这为了避免多个客户端尝试同时获取同一个资源(类似脑裂的情况,大概意思就是竞争了一堆,却发现,没人拿到锁),客户端拿到锁越快(早),脑裂的情况越小(或者重试的需要越小),所以理想情况下,客户端可以同时(多路复用)发送set命令给各个master。

释放锁

锁释放步骤很简单,就是把所有master实例上的锁释放,并不需要关心客户端在该实例上有没有成功得到锁。


Redisson 分析和使用

使用(一)、搭建多个 Redis Master

利用 docker 创建三个容器,使用 redis:5.0.7 镜像搭建多个 Redis-server,再利用端口映射实现各个 Redis 服务端 6379端口分别映射到主服务器 6379、6380、6381端口。

详细搭建步骤如下,这里以 6379的 redis服务搭建为例:

# 创建文件挂载目录
mkdir -p /opt/docker/redis/data
mkdir -p /opt/docker/redis/conf

# 配置文件也需要映射(注意修改配置文件配置才能进行远程连接)
cp redis.conf /opt/docker/redis/conf/redis.conf

# 创建容器启动 redis01-server
docker run -p 6379:6379 --name redis -v /opt/docker/redis/conf/redis.conf:/etc/redis/redis.conf -v /opt/docker/redis/data:/data -d redis:5.0.7 redis-server /etc/redis/redis.conf --appendonly yes
# 同理创建 redis02-server
docker run -p 6380:6379 --name redis02 -v /opt/docker/redis02/conf/redis.conf:/etc/redis/redis.conf -v /opt/docker/redis02/data:/data -d redis:5.0.7 redis-server /etc/redis/redis.conf --appendonly yes
# 创建 redis03-server
docker run -p 6381:6379 --name redis03 -v /opt/docker/redis03/conf/redis.conf:/etc/redis/redis.conf -v /opt/docker/redis03/data:/data -d redis:5.0.7 redis-server /etc/redis/redis.conf --appendonly yes

使用(二)、redisson使用

  1. 引入pom.xml
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.3.2</version>
        </dependency>
  1. 获取多个服务端连接
@Test
public void testSentinel() {
    // 创建到多个 master redis 的客户端连接
    Config config1 = new Config();
    config1.useSingleServer().setAddress("xxxxxxxx:6379").setPassword("xjx123456").setDatabase(0);
    RedissonClient redissonClient1 = Redisson.create(config1);

    Config config2 = new Config();
    config2.useSingleServer().setAddress("xxxxxxxx:6380").setPassword("xjx123456").setDatabase(0);
    RedissonClient redissonClient2 = Redisson.create(config2);

    Config config3 = new Config();
    config3.useSingleServer().setAddress("xxxxxxxx:6381").setPassword("xjx123456").setDatabase(0);
    RedissonClient redissonClient3 = Redisson.create(config3);

    String resourceName = "REDLOCK_KEY";
    RLock lock1 = redissonClient1.getLock(resourceName);
    RLock lock2 = redissonClient2.getLock(resourceName);
    RLock lock3 = redissonClient3.getLock(resourceName);
    // 向三个实例拿锁
    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
    boolean isLock;
    // 500ms 拿锁,拿到锁有效期10s
    try {
        isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
        System.out.println("isLock = " + isLock);
        if (isLock) {
            System.out.println("成功获取到 RedLock 锁");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        redLock.unlock();
    }
}

Redisson源码分析(一)、获取锁

上述代码中 tryLock() 方法实际调用了 RedissonMultiLocktryLock() 方法,其中 waitTime(获取锁的最长时间)、leaseTime(锁生效的最长时间)、unit(时间单位)

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1;
    if (leaseTime != -1) {
        newLeaseTime = waitTime*2;
    }
    
    long time = System.currentTimeMillis();
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }
    int failedLocksLimit = failedLocksLimit();
    List<RLock> lockedLocks = new ArrayList<RLock>(locks.size());
    
    // 循环遍历每把锁的实例集合,获取 RLock 对象
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        try {
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock(); // 尝试获取锁,返回结果
            } else {
                long awaitTime = unit.convert(remainTime, TimeUnit.MILLISECONDS);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, unit);
            }
        } catch (Exception e) {
            lockAcquired = false;
        }
         
        // 获取成功,添加到 lockedLocks 集合中
        if (lockAcquired) {
            lockedLocks.add(lock);
        } else {
            // 如果不能保证获取至少 N/2 + 1 把锁,就结束
            if (locks.size() - lockedLocks.size() == failedLocksLimit()) {
                break;
            }

            if (failedLocksLimit == 0) {
                // 不允许失败了,这里失败了就释放掉所有已成功添加的锁,方法是异步的
                unlockInner(lockedLocks);
                if (waitTime == -1 && leaseTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                lockedLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                // 否则 failedLocksLimit 减一,切换获取下一把锁
                failedLocksLimit--;
            }
        }
        
        if (remainTime != -1) {
            // 获取锁消耗的时间 和 允许最大获取时间 remainTime 比较
            remainTime -= (System.currentTimeMillis() - time);
            time = System.currentTimeMillis();

            // 获取超时,释放所有锁资源
            if (remainTime <= 0) {
                unlockInner(lockedLocks);
                return false;
            }
        }
    }

    if (leaseTime != -1) {
        List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(lockedLocks.size());
        for (RLock rLock : lockedLocks) {
            RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }
        
        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }
    
    return true;
}

RedissonLocktryLock() 方法是实际单把锁的加锁逻辑:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    final long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);  // 核心方法
    // lock acquired
    if (ttl == null) {
        return true;
    }
    
    time -= (System.currentTimeMillis() - current);
    if (time <= 0) {
        acquireFailed(threadId);
        return false;
    }
    
    current = System.currentTimeMillis();
    final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                @Override
                public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                    if (subscribeFuture.isSuccess()) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                }
            });
        }
        acquireFailed(threadId);
        return false;
    }

    try {
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
    
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }

            time -= (System.currentTimeMillis() - currentTime);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }

            // waiting for message
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }

            time -= (System.currentTimeMillis() - currentTime);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        }
    } finally {
        unsubscribe(subscribeFuture, threadId);
    }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
}

// 接上部分
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}


// 接上部分
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        // 锁生效时间不为 -1 时,详细的加锁逻辑
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

获取锁 redLock.tryLock() 的核心源码如下,其中默认的租约时间(leaseTime)是 LOCK_EXPIRATION_INTERVAL_SECONDS,即 30s。获取锁时会执行如下 LUA 指令:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
    // 分布式key不存在,不存在就 hset创建(hset REDLOCK_KEY uuid+threadId 1),并且 pexpire 设置锁资源失效时间
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
    // 分布式Key已经存在,value也能对应上,可重入,hincrby 添加计数,重置锁失效时间
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
    // 获取分布式Key失效的毫秒数
                "return redis.call('pttl', KEYS[1]);",
    // 三个参数分别对应:
    // REDLOCK_KEY ——> KEYS[1]
    // leaseTime ——> ARGV[1]
    // uuid+threaId ——> ARGV[2]
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

总的来说,获取锁的逻辑为:

  1. 总共奇数个 Redis_Master 实例,每个实例对应一个 RedissonClient ,通过 getLock() 方法获取到对应的 RedissonLock 对象,所有对象存储在一个 ArrayList中;
  2. 遍历每个 RedissonLock 对象,通过 tryLock() 方法尝试获取锁,该方法底层通过 LUA 指令实现。锁在 redis 中以 hash 结构存储,哈希表名为 REDLOCK_KEY字符串、keyuuid+threadIdvalue 为整数值,记录锁重入的次数。
  3. 根据 tryLock() 返回的结果判断是否成功获取锁:
    1. 若成功直接加入到成功的集合中;
    2. 失败时先判断是否已满足最低加锁要求(至少需要 N/2 + 1 个机器加锁成功),已满足就退出,返回加锁成功;不满足就判断是否超过了最大失败次数(刚好能满足加锁成功的临界值),超过了就通过 unlockInner(xxx) 方法异步释放已经获取的锁资源;
  4. 当前机器加锁操作完成后(无论成功失败),都需要判断总的加锁耗时是否超过预设耗时,若超过,异步释放掉所有锁资源,返回 false。

Redisson源码分析(二)、释放锁

释放锁参考 RedissonMultiLock 类的 unlock() 方法,该方法返回 RFuture对象,然后异步的方式获取结果,如下:

public void unlock() {
    List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(locks.size());

    for (RLock lock : locks) {
        futures.add(lock.unlockAsync());
    }

    for (RFuture<Void> future : futures) {
        future.syncUninterruptibly();
    }
}

方法底层调用了 unlockAsnyc() 方法,如下:

public RFuture<Void> unlockAsync() {
    long threadId = Thread.currentThread().getId();
    return unlockAsync(threadId);
}
public RFuture<Void> unlockAsync(final long threadId) {
    final RPromise<Void> result = newPromise();
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.addListener(new FutureListener<Boolean>() {
        @Override
        public void operationComplete(Future<Boolean> future) throws Exception {
            if (!future.isSuccess()) {
                result.tryFailure(future.cause());
                return;
            }

            Boolean opStatus = future.getNow();
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
            if (opStatus) {
                cancelExpirationRenewal();
            }
            result.trySuccess(null);
        }
    });

    return result;
}

底层释放锁资源的 LUA 指令:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
    // 如果 Key 不存在,向 channel 发布一条消息
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
    // 如果分布式锁存在,但是 value 不匹配,说明锁已经占用了,返回
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
    // 当前线程就是持有分布式锁的线程,重入计数减1
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
    // 减去后值仍然大于0,重新设置锁有效时间
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
    // 否则删除Key,发布解锁消息
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
    // 四个参数对应:
    // KEYS[1]——>REDLOCK_KEY、KEYS[2]——>通道名称
    // ARGV[1]——>解锁消息
    // ARGV[2]——>锁生效时间
    // ARGV[3]——>uuid+threadId
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}

总的来说释放锁步骤如下:

  1. 遍历每一个 RedissonLock 对象,异步方式调用 unlockAsync() 方法,该方法底层依赖于 unlockInnerAsync() 的一系列 LUA 指令;
  2. LUA 指令先判断 key 是否存在,不存在就向 channel 发布消息;
  3. 若分布式锁存在,但是 value 值不匹配,说明锁已经被占用,返回;
  4. 若当前线程持有的就是分布式锁,重入计数减1;若减去后值仍然大于0,重新设置锁生效时间,否则删除对应的 key,发布解锁消息。