python下使用redis分布式锁

发布时间 2023-07-07 10:53:00作者: 春游去动物园

python下使用redis分布式锁

1.什么场景需要分布式锁?

我们在写业务逻辑的时候,如果多个线程同时访问某个共享变量,一般是对变量进行上锁或者使用 queue.Queue() 实现,以做到线程安全保证数据不被污染。

在单机部署的情况下这样做完全没问题,但是随着业务规模的发展,某些单机部署的系统需要进化成分布式集群系统,需要部署到多台服务器上去做负载均衡,这就会使得以前的并发控制策略失效,这个时候就需要一种跨机器的互斥机制来控制共享资源的访问,以保证线程安全,这个时候就需要使用到分布式锁。

2.分布式锁应该具备的条件:

① 在分布式系统环境下,同一时间只能有一个客户端能持有锁,且保证加锁和解锁的必须是同一个客户端

② 为了防止死锁,需要具备锁失效机制,即使一个客户端在持有锁期间崩溃没有主动释放锁,也能保证后续其他客户端能正常加锁

③ 具备非堵塞特性,没有获取到锁则直接返回获取锁失败。

3.redis 分布式锁的原理

redis 中的 SETNX 命令可以实现「key不存在才插入」:

如果 key 不存在,则显示插入成功,用来表示加锁成功

如果 key 存在,则会显示插入失败,用来表示加锁失败

通过 setnx 设置分布式锁,拿到这个锁的线程就可以执行业务代码,没有拿到的则只能进行等待,执行完业务代码后的线程需要通过 del key 释放锁,让其他线程能够重新获取,这样就能实现在分布式系统并发情况下,始终只有一个线程在执行业务代码。

4.demo

'''
	pip install redis
'''

import time
import uuid
import redis
from threading import Thread

# redis 存字符串返回的是byte, 指定 decode_responses=True 可以解决
pool = redis.ConnectionPool(host="127.0.0.1", port=6379, socket_connect_timeout=3, decode_responses=True)
redis_cli = redis.Redis(connection_pool=pool)


# 加锁
def acquire_lock(lock_name, acquire_timeout=4, lock_timeout=7):
    """
    param lock_name: 锁名称
    param acquire_timeout: 客户端获取锁的超时时间
    param lock_timeout: 锁过期时间, 超过这个时间锁自动释放
    """
    identifier = str(uuid.uuid4())
    end_time = time.time() + acquire_timeout  # 客户端获取锁的结束时间
    while time.time() <= end_time:
        # setnx(key, value) 只有 key 不存在情况下将 key 设置为 value 返回 True
        # 若 key 存在则不做任何动作,返回 False
        if redis_cli.setnx(lock_name, identifier):
            redis_cli.expire(lock_name, lock_timeout)  # 设置锁的过期时间,防止线程获取锁后崩溃导致死锁
            return identifier  # 返回锁唯一标识
        elif redis_cli.ttl(lock_name) == -1:  # 当锁未被设置过期时间时,重新设置其过期时间
            redis_cli.expire(lock_name, lock_timeout)
        time.sleep(0.001)
    return False  # 获取超时返回 False


# 释放锁
def release_lock(lock_name, identifier):
    """
    param lock_name:   锁名称
    param identifier:  锁标识
    """
    # 解锁操作需要在一个 redis 事务中进行,python 中 redis 事务通过 pipeline 封装实现
    with redis_cli.pipeline() as pipe:
        while True:
            try:
                # 使用 WATCH 监听锁,如果删除过程中锁自动失效又被其他客户端拿到,即锁标识被其他客户端修改
                # 此时设置了 WATCH 事务就不会再执行,这样就不会出现删除了其他客户端锁的情况
                pipe.watch(lock_name)
                id = pipe.get(lock_name)
                if id and id == identifier:  # 判断解锁与加锁线程是否一致
                    pipe.multi()  # 开启事务
                    pipe.delete(lock_name)  # 标识相同,在事务中删除锁
                    pipe.execute()  # 执行EXEC命令后自动执行UNWATCH
                    return True
                pipe.unwatch()
                break
            except redis.WatchError:
                pass
        return False


# 发售商品件数
goods = 10


def exec_test(thread_name):
    identifier = acquire_lock('jaye')
    if identifier:  # 如果获取到锁,则执行业务逻辑
        print(f'{thread_name}获取 redis 分布式锁成功!')
        time.sleep(3)  # 模拟业务耗时
        global goods
        goods = goods - 1
        print(f'------------------------{thread_name}抢到了东西------------------------------')
        res = release_lock('jaye', identifier)  # 处理完之后释放锁
        print(f'{thread_name} 释放状态: {res}')
    else:
        print(f'{thread_name}获取 redis 分布式锁失败, 其他线程正在使用')


if __name__ == '__main__':
    for i in range(1000):
        t_name = f'thread_{i}'
        t = Thread(target=exec_test, args=(t_name,))
        t.start()
    time.sleep(10)
    print('还剩下{}'.format(goods))

5.注意

'''
    ps:redis主从模式中数据的复制是异步的,如果在主节点获取到锁后,没来得及同步到从节点就宕机了,哨兵重新选举新的主节点后依然可以获取锁,也就导致多个服务可以同时拿到锁操作资源,这种情况下可以使用Redlock。
'''