Redis分布式锁演进架构

发布时间 2023-10-06 18:40:26作者: Chimengmeng

【一】引言

  • 分布式锁相信大家一定不会陌生,想要用好或者自己写一个却没那么简单。
  • 想要达到上述的条件,一定要 掌握分布式锁的应用场景,以及分布式锁的不同实现,不同实现之间有什么区别。

【二】分布式锁场景

  • 如果想真正了解分布式锁,需要结合一定场景; 举个例子,某夕夕上抢购 AirPods Pro 的 100 元优惠券。

image-20231006173855512

  • 如果使用下面这段代码当作抢购优惠券的后台程序,我们一起看一下,可能存在什么样的问题。
import redis
from flask import Flask

app = Flask(__name__)
# 连接到本地的 Redis 服务器
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

@app.route('/acquire')
def acquire_discount():
    # 锁的键名
    lock_key = "surplus"
    # 获取 Redis 缓存中剩余优惠券数量,如果为空则默认为 0
    surplus_num = int(redis_client.get(lock_key) or 0)

    # 如果优惠券已领完,返回 "fail"
    if surplus_num <= 0:
        return "fail"

    # 执行逻辑操作,这里可以添加具体的业务逻辑

    # 对优惠券剩余数量 -1
    surplus_num -= 1

    # 更新 Redis 缓存中的优惠券数量
    redis_client.set(lock_key, str(surplus_num))

    return "success"

if __name__ == '__main__':
    app.run(debug=True)
  • 很明显的就是这段流程在并发场景下并不安全,会导致优惠券发放超过预期,类似电商抢购超卖问题。
  • 想一哈有什么方式可以避免这种分布式下超量问题?
    • 互斥加锁,互斥锁的语义就是 同一时间,只允许一个客户端对资源进行操作。

【1】引入互斥锁

import redis
from flask import Flask
from redis.exceptions import LockError

app = Flask(__name__)
# 连接到本地的 Redis 服务器
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

# 定义全局互斥锁
lock = redis_client.lock("discount_lock")

@app.route('/acquire')
def acquire_discount():
    # 尝试获取互斥锁,如果获取失败说明有其他客户端正在操作
    with lock:
        # 锁的键名
        lock_key = "surplus"
        # 获取 Redis 缓存中剩余优惠券数量,如果为空则默认为 0
        surplus_num = int(redis_client.get(lock_key) or 0)

        # 如果优惠券已领完,返回 "fail"
        if surplus_num <= 0:
            return "fail"

        # 执行逻辑操作,这里可以添加具体的业务逻辑

        # 对优惠券剩余数量 -1
        surplus_num -= 1

        # 更新 Redis 缓存中的优惠券数量
        redis_client.set(lock_key, str(surplus_num))

        return "success"

if __name__ == '__main__':
    app.run(debug=True)
  • 但是生产环境为了保证服务高可用,起码要 部署两台服务
  • 分布式情况下只能通过 分布式锁 来解决多个服务资源共享的问题了。

【2】分布式锁

  • 分布式锁的定义:保证同一时间只能有一个客户端对共享资源进行操作。

  • 比对刚才举的例子,不论部署多少台优惠券服务,只会有 一台服务能够对优惠券数量进行增删操作

  • 另外有几点要求也是必须要满足的:

    • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

    • 具有容错性。只要大部分的 Redis 节点正常运行,客户端就可以加锁和解锁。

    • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

  • 分布式锁实现大致分为三种,Redis、Zookeeper、数据库,文章以 Redis 展开分布式锁的讨论。

【三】分布式锁演进史

  • 先来构思下分布式锁实现思路。

  • 首先我们必须保证同一时间只有一个客户端(部署的优惠券服务)操作数量加减。其次本次 客户端操作完成后,需要让 其它客户端继续执行

    • 客户端一存放一个标志位,如果添加成功,操作减优惠券数量操作。

    • 客户端二添加标志位失败,本次减库存操作失败(或继续尝试获取等)。

    • 客户端一优惠券操作完成后,需要将标志位释放,以便其余客户端对库存进行操作。

【1】第一版 setnx

  • 向 Redis 中添加一个 lockKey 锁标志位,如果添加成功则能够继续向下执行扣减优惠券数量操作,最后再释放此标志位。
from flask import Flask
from redis import StrictRedis
from flask_redis import FlaskRedis

app = Flask(__name__)
# 连接到本地的 Redis 服务器
redis_store = FlaskRedis.from_custom_provider(StrictRedis, app)

@app.route('/acquire')
def acquire_discount():
    # 设置分布式锁标志位的键名
    lock_key = "lockKey"
    # 尝试获取分布式锁,如果已存在则返回 False
    # nx=True 参数,确保了分布式锁的获取是原子操作
    if not redis_store.set(lock_key, "lock", nx=True):
        return "fail"

    # 锁获取成功后执行业务逻辑
    surplus_key = "surplus"
    surplus_obj = redis_store.get(surplus_key)
    surplus_num = int(surplus_obj) if surplus_obj else 0

    # 执行逻辑操作,这里可以添加具体的业务逻辑

    # 库存数量不足,返回失败
    if surplus_num <= 0:
        # 释放锁
        redis_store.delete(lock_key)
        return "fail"

    # 对优惠券剩余数量 -1
    surplus_num -= 1

    # 更新 Redis 缓存中的优惠券数量
    redis_store.set(surplus_key, str(surplus_num))

    # 删除分布式锁
    redis_store.delete(lock_key)

    return "success"

if __name__ == '__main__':
    app.run(debug=True)

【2】第二版 expire

  • 上面第一版基于 setnx 命令实现分布式锁的缺陷也是很明显的,那就是 一定情况下可能发生死锁。
  • 画个图,举个例子说明哈。

image-20231006180140598

  • 上图说明,线程 1 在成功获取锁后,执行流程时异常结束,没有执行释放锁操作,这样就会 产生死锁
  • 如果方法执行异常导致的线程被回收,那么可以将解锁操作放到 finally 块中。
  • 但是还有存在死锁问题,如果获得锁的线程在执行中,服务被强制停止或服务器宕机,锁依然不会得到释放
  • 这种极端情况下我们还是要考虑的,毕竟不能只想着服务没问题对吧。
  • 对 Redis 的 锁标志位加上过期时间 就能很好的防止死锁问题,继续更改下程序代码。
from flask import Flask
from flask_redis import FlaskRedis
import time

app = Flask(__name__)
# 连接到本地的 Redis 服务器
redis_store = FlaskRedis(app)

@app.route('/acquire')
def acquire_discount():
    # 设置分布式锁标志位的键名
    lock_key = "lockKey"
    # 尝试获取分布式锁,如果已存在则返回 False
    if not redis_store.set(lock_key, "lock", nx=True):
        return "fail"

    # 对分布式锁标志位添加过期时间,确保即使业务逻辑执行失败也能释放锁
    redis_store.expire(lock_key, 5)

    surplus_key = "surplus"
    surplus_obj = redis_store.get(surplus_key)
    surplus_num = int(surplus_obj) if surplus_obj else 0

    # 库存数量不足,返回失败
    if surplus_num <= 0:
        # 释放锁
        redis_store.delete(lock_key)
        return "fail"

    # 执行逻辑操作,这里可以添加具体的业务逻辑

    # 对优惠券剩余数量 -1
    surplus_num -= 1

    # 更新 Redis 缓存中的优惠券数量
    redis_store.set(surplus_key, str(surplus_num))

    # 删除分布式锁
    redis_store.delete(lock_key)

    return "success"

if __name__ == '__main__':
    app.run(debug=True)
  • 虽然对分布式锁添加了过期时间,但依然无法避免极端情况下的死锁问题。
  • 那就是如果在客户端加锁成功后,还没有设置过期时间时宕机
  • 如果想要避免添加锁时死锁,那就对添加锁标志位 & 添加过期时间命令 保证一个原子性,要么一起成功,要么一起失败

【3】第三版 set

  • 我们的添加锁原子命令就要登场了,从 Redis 2.6.12 版本起,提供了可选的 字符串 set 复合命令
SET key value [expiration EX seconds|PX milliseconds] [NX|XX]
  • 可选参数如下:

    • EX: 设置超时时间,单位是秒。

    • PX: 设置超时时间,单位是毫秒。

    • NX: IF NOT EXIST 的缩写,只有 KEY 不存在的前提下 才会设置值。

    • XX: IF EXIST 的缩写,只有在 KEY 存在的前提下 才会设置值。

  • 继续完善分布式锁的应用程序,代码如下:

from flask import Flask
import redis
import time

app = Flask(__name__)
# 连接到本地的 Redis 服务器
cache = redis.StrictRedis(host='localhost', port=6379, password='123456', decode_responses=True)

@app.route('/acquire')
def acquire_discount():
    # 设置分布式锁标志位的键名
    lock_key = "lockKey"
    
    # 重点: 尝试设置分布式锁,成功返回 OK
    set_result = cache.set(lock_key, "lock", nx=True, ex=5)

    # 未获取到分布式锁标志位,失败
    if not set_result == "OK":
        return "fail"

    surplus_key = "surplus"
    surplus_obj = cache.get(surplus_key)
    surplus_num = int(surplus_obj) if surplus_obj else 0

    # 库存数量不足,返回失败
    if surplus_num <= 0:
        # 释放锁
        cache.delete(lock_key)
        return "fail"

    # 执行逻辑操作,这里可以添加具体的业务逻辑

    # 对优惠券剩余数量 -1
    surplus_num -= 1

    # 更新 Redis 缓存中的优惠券数量
    cache.set(surplus_key, str(surplus_num))

    # 删除分布式锁
    cache.delete(lock_key)

    return "success"

if __name__ == '__main__':
    app.run(debug=True)
  • 加锁以及设置过期时间确实保证了原子性,但是这样的分布式锁就没有问题了么?
  • 我们根据图片以及流程描述设想一下这个场景:

    • 线程一获取锁成功,设置过期时间五秒,接着执行业务逻辑;

    • 接着线程一获取锁后执行业务流程,执行的时间超过了过期时间,锁标志位过期进行释放,此时线程二获取锁成功;

    • 然而此时线程一执行完业务后,开始执行释放锁的流程,然后顺手就把线程二获取的锁释放了。

  • 如果线上真的发生上述问题,就可能会造成线上数据和业务的运行异常,更甚者可能存在线程一将线程二的锁释放掉之后,线程三获取到锁,然后线程二执行完将线程三的锁释放。

【4】第四版 verify value

  • 事当如今,只能创建辨别客户端身份的唯一值了,将加锁及解锁归一化,上代码。
from flask import Flask
import redis
import uuid

app = Flask(__name__)
# 连接到本地的 Redis 服务器
cache = redis.StrictRedis(host='localhost', port=6379, password='123456', decode_responses=True)

@app.route('/acquire')
def acquire_discount():
    # 设置分布式锁标志位的键名
    lock_key = "lockKey"

    # 生成客户端唯一标识
    lock_value = str(uuid.uuid4())

    # 重点: 尝试设置分布式锁,成功返回 OK
    set_result = cache.set(lock_key, lock_value, nx=True, ex=5)

    # 未获取到分布式锁标志位,失败
    if set_result != "OK":
        return "fail"

    surplus_key = "surplus"
    surplus_obj = cache.get(surplus_key)
    surplus_num = int(surplus_obj) if surplus_obj else 0

    # 库存数量不足,返回失败
    if surplus_num <= 0:
        # 释放锁
        cache.delete(lock_key)
        return "fail"

    # 执行逻辑操作,这里可以添加具体的业务逻辑

    # 对优惠券剩余数量 -1
    surplus_num -= 1

    # 更新 Redis 缓存中的优惠券数量
    cache.set(surplus_key, str(surplus_num))

    # 重点: 删除分布式锁,确保只有持有锁的客户端可以释放
    if cache.get(lock_key) == lock_value:
        cache.delete(lock_key)

    return "success"

if __name__ == '__main__':
    app.run(debug=True)
  • 这一版的代码相当于我们添加锁标志位时,同时为每个客户端设置了 uuid 作为锁标志位的 val,解锁时需要判断锁的 val 是否和自己客户端的相同,辨别成功才会释放锁。

  • 但是上述代码执行业务逻辑如果抛出异常,锁只能等待过期时间,我们可以将解锁操作放到 finally 块。

from flask import Flask
import redis
import uuid

app = Flask(__name__)
# 连接到本地的 Redis 服务器
cache = redis.StrictRedis(host='localhost', port=6379, password='123456', decode_responses=True)

@app.route('/acquire')
def acquire_discount():
    # 设置分布式锁标志位的键名
    lock_key = "lockKey"

    # 生成客户端唯一标识
    lock_value = str(uuid.uuid4())

    try:
        # 重点: 尝试设置分布式锁,成功返回 OK
        set_result = cache.set(lock_key, lock_value, nx=True, ex=5)

        # 未获取到分布式锁标志位,失败
        if set_result != "OK":
            return "fail"

        surplus_key = "surplus"
        surplus_obj = cache.get(surplus_key)
        surplus_num = int(surplus_obj) if surplus_obj else 0

        # 库存数量不足,返回失败
        if surplus_num <= 0:
            return "fail"

        # 执行逻辑操作,这里可以添加具体的业务逻辑

        # 对优惠券剩余数量 -1
        surplus_num -= 1

        # 更新 Redis 缓存中的优惠券数量
        cache.set(surplus_key, str(surplus_num))

    finally:
        # 重点: 删除分布式锁,确保只有持有锁的客户端可以释放
        if cache.get(lock_key) == lock_value:
            cache.delete(lock_key)

    return "success"

if __name__ == '__main__':
    app.run(debug=True)

【5】前四版小结

  • 真相就是: 解锁时, 由于判断锁和删除标志位并不是原子性的,所以可能还是会存在误删

    • 线程一获取锁后,执行流程 balabala... 判断锁也是自家的,这时 CPU 转头去做别的事情了,恰巧线程一的锁过期时间到了;

    • 线程二此时顺理成章的获取到了分布式锁,执行业务逻辑 balabala;

    • 线程一再次分配到时间片继续执行删除操作。

  • 解决这种非原子操作的方式只能 将判断元素值和删除标志位当作一个原子操作

【6】第五版 lua

  • 很不友好的是,del 删除操作并没有提供原子命令,所以我们需要想点办法。

  • Redis 在 2.6 推出了脚本功能,允许开发者使用 Lua 语言编写脚本传到 Redis 中执行。

  • 使用 Lua 脚本有什么好处呢?

    • 减少网络开销:原本我们需要向 Redis 服务请求多次命令,可以将命令写在 Lua 脚本中,这样执行只会发起一次网络请求。

    • 原子操作:Redis 会将 Lua 脚本中的命令当作一个整体执行,中间不会插入其它命令。

    • 复用:客户端发送的脚步会存储 Redis 中,其他客户端可以复用这一脚本而不需要使用代码完成相同的逻辑。

  • 那我们编写一个简单的 Lua 脚本实现原子删除操作。

from flask import Flask
import redis
import uuid

app = Flask(__name__)
# 连接到本地的 Redis 服务器
cache = redis.StrictRedis(host='localhost', port=6379, password='123456', decode_responses=True)

@app.route('/acquire')
def acquire_discount():
    # 设置分布式锁标志位的键名
    lock_key = "lockKey"

    # 生成客户端唯一标识
    lock_value = str(uuid.uuid4())

    try:
        # 重点: 尝试设置分布式锁,成功返回 OK
        set_result = cache.set(lock_key, lock_value, nx=True, ex=5)

        # 未获取到分布式锁标志位,失败
        if set_result != "OK":
            return "fail"

        surplus_key = "surplus"
        surplus_obj = cache.get(surplus_key)
        surplus_num = int(surplus_obj) if surplus_obj else 0

        # 库存数量不足,返回失败
        if surplus_num <= 0:
            return "fail"

        # 执行逻辑操作,这里可以添加具体的业务逻辑

        # 对优惠券剩余数量 -1
        surplus_num -= 1

        # 更新 Redis 缓存中的优惠券数量
        cache.set(surplus_key, str(surplus_num))

    finally:
        # 重点: 使用 Lua 脚本删除分布式锁,确保只有持有锁的客户端可以释放
        script = """
            local cliVal = redis.call('get', KEYS[1])
            if cliVal == ARGV[1] then
                redis.call('del', KEYS[1])
                return 'OK'
            else
                return nil
            end
        """
        cache.eval(script, 1, lock_key, lock_value)

    return "success"

if __name__ == '__main__':
    app.run(debug=True)
  • 重点就在 Lua 脚本这一块,重点说一下这块的逻辑。
  • script 脚本就是我们在 Redis 中执行的 Lua 脚本,后面跟的两个 List 分别是 KEYS、ARGV。
cache.eval(script, 1, lock_key, lock_value)
  • KEYS[1]: lockKey
  • ARGV[1]: lockValue

【四】待完成功能

  • 虽然上述代码已经很大程度上解决了分布式锁可能存在的一些问题。

  • 但是下述列出的问题部分就不是客户端代码范畴内的事情了:

    • 如何实现可重入锁。

    • 如何解决代码执行锁超时。

    • 主从节点同步数据丢失导致锁丢失问题。

  • 上述问题等下一篇介绍 Redisson 源码时会一一说明,顺道向大家推出一款 Redis 官方推荐的客户端: Redisson

    • 并不是推荐一定要用 Redisson,根据不同场景选用不同客户端。
  • Redisson 就是为分布式提供 各种不同锁以及多样化的技术支持, 感兴趣的小伙伴可以看一下 Giuhub 上的介绍,挺详细的。