使用Redis构建全局并发锁

时间:2022-05-03 05:49:19

谈起Redis的用途,小伙伴们都会说使用它作为缓存,目前很多公司都用Redis作为缓存,但是使用Redis仅仅作为缓存未免太大材小用了。深究Redis的原理后你会发现它有很多用途,在很多场景下能够使用它快速地解决问题。常见的用途有:分布式锁控制并发、结合bloom filter用于推荐去重、HyperLogLog用于统计UV、限流控制流量等等;这里我谈下Redis分布式锁控制并发的问题。

高并发是个老生常谈的问题,当产品达到一定规模用户量后,这个问题是不得不考虑的,即使当前用户量不大(例如博主现在的公司),但自己平时在设计API的时候最好也尽可能地考虑到并发问题。

Redis分布式锁控制并发主要是通过在Redis里面创建一个key,当其它进程准备占用的时候只能等待key释放再占用。Redis里面有一个原子性指令setnx,当key存在时,它返回0,表示当前已有进程占用,当它返回1时可以执行业务逻辑,此时没有进程占用,等逻辑执行完后,可以删除key释放锁,这样可以简单的控制并发。

使用Redis构建全局并发锁

但是细想之下你会发现,在业务逻辑执行的过程中如果发生异常,此时key并没有删除,这样就会造成死锁,死锁带来的后果想必大家都很清楚。为了解决这个问题,可以在setnx加锁后设置key的过期时间,当key到期自动删除。

使用Redis构建全局并发锁

但是仔细想想你还会发现,如果在执行setnx后,执行expire前Redis发生宕机了,这样就不会执行expire,也会造成死锁。由于setnx与expire是两条命令,并且expire依赖setnx的执行结果,为了解决这个问题可以使用set key value [expiration EX seconds|PX milliseconds] [NX|XX] ,这是一条原子性的指令,同时包含setnx和expire。

使用Redis构建全局并发锁

使用python实现的代码:

 class RedisLock(object):
"""
踩坑 Redis并发锁
""" def __init__(self, key):
self.redis_conn = get_redis_conn()
self.lock_key = "{}_redis_gil".format(key) @staticmethod
def get_lock_value(cls):
"""
获取value
:param cls:
:return:
"""
cls.get_lok = cls.redis_conn.get(cls.lock_key)
return cls.get_lok @staticmethod
def set_lock(cls, random_value):
"""
不能使用setnx 没有设置过期时间,可能会出现死锁
引入random_value :自己加的锁只能自己释放
:param cls:
:param random_value:
:return:
"""
cls._lock = cls.redis_conn.set(cls.lock_key, random_value, nx=True, ex=5) # 如果返回null 表示key存在存在并发
if cls._lock:
return True
else:
LOGGER = logging.getLogger('core.utils')
LOGGER.warning(u"试题复制存在并发")
raise RsError("试题复制存在并发,请稍后再试") @staticmethod
def release(cls):
"""
释放锁
:param cls:
:return:
"""
cls.redis_conn.delete(cls.lock_key) @staticmethod
def redis_lock(cls):
"""
只有当设置的value与do_something执行完后所获取的值相同时才删除key
防止在分布式中: clientA由于执行时间过期(clientA的执行时间比设置的过期时间大),clientB获取锁,
clientA执行完后释放锁(删除key),其实这时候删除的是B的key,
为防止这种情况引入random_value 只有当前值为random_value时才删除
:param cls:
:return:
"""
random_value = time.time()
if cls.set_lock(cls, random_value):
do_something()
now_value = cls.get_lock_value(cls)
if now_value == random_value:
cls.release()
return True
else:
return False def do_something():
pass

在实际业务中调用Redis全局锁,进行加锁示例:

 # 公库试题复制到平台考虑并发问题,加锁处理
if self.visible_scope == 10:
key = hash(self.question_id)
cls = RedisLock(key)
cls.redis_lock(cls)
try:
self.insert_question()
except Exception:
raise RsError("试题插入失败")
finally:
cls.release(cls)

如果是Redis集群下此方法可能仍然有问题,试想下:在一个redis集群中,主节点由于某种原因挂掉了,从节点变成了主节点,而此时redis锁还未同步到原从节点中,那么这个锁也就失效了,当其它进程申请锁时仍然可以申请成功。

针对这个问题,新版的redis引入了redlock,通过redlock.Redlock对多个redis节点进行加锁,当超过一半的节点加锁成功时锁才生效。这样在一定程度上提高了高可用性,但由于每次加锁和释放锁要对多个节点进行读写,所以性能上肯定是没有单节点锁高的。