一,分布式锁诞生的原因
为什么分布式锁会诞生?类似于淘宝双11的秒杀活动,同一件商品怎么才能只被一个用户抢到,其他用户抢不到?分布式锁就能巧妙地解决类似秒杀和抢单的问题。技术源于生活,更高于生活。对于阿里的那种的大型秒杀活动,分布式锁只是其中的一环,单单靠分布式锁不足以支撑那种大并发的情景,后续解决方案会陆续更新。本期只讲解分布式锁。
二,常见的分布式锁
1,基于数据库实现的分布式锁
基于表实现的分布式锁,如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁
2,Redis分布式锁
主要通过redis的存值取值进行判断的,根据返回的参数判断是否能拿到锁
3,Zookeeper分布式锁
利用Zookeeper的顺序临时节点,来实现分布式锁和等待队列。Zookeeper设计的初衷,就是为了实现分布式锁服务的。
三,SpringBoot项目中的Redis分布式锁
1,引入相关的依赖
首先你的springboot项目是需要正常的能够跑通的,然后在你的主pom文件引入redis的相关依赖
-
<!--springboot 集成reids-->
-
<dependency>
-
<groupId></groupId>
-
<artifactId>spring-boot-starter-data-redis</artifactId>
-
<version>2.1.3.RELEASE</version>
-
</dependency>
2,redis的分布锁的原理
一般情况像这种经常用到的代码,单独抽一个Redis工具类出来,方便自己查看和调用。比如,我们现在进行一个活动,整点秒杀一台macPro电脑。想要将商品秒杀到,那只需要改变数据库的商品表的该商品的状态置为已下架,同时创建订单即可。在整点的时候,很多人同时秒杀更新数据库,那我们如何保证只被一个人拿到?道理很简单,我们将下单(更新数据库,创建订单)打包成一个方法,在方法的外面加锁,该锁被第一个A线程拿到后,其他线程未拿到锁则返回【很遗憾~您手慢啦~】,A线程将该商品置为已下架并且创建订单后,释放该锁,防止死锁。哪怕后面的线程延迟,再A线程释放锁后又拿到下单方法,因为商品的状态为已下架同样没有办法进行创建订单。至此,达到我们最初的目的,秒杀功能完成。
3,加锁操作
redis有StringRedisTemplate 和RedisTemplate 。我这里使用前者实现。
-
@Autowired
-
private StringRedisTemplate redisTemplate;
在高并发的情况下,确保某一个方法只能被一个人调用,那么我们只要在该方法外调用工具类的加锁方法,该加锁方法返回true,则代表该方法没有被其他线程占用。若返回false,则代表该方法已经被其他线程占用,同步返回【很遗憾~您手慢啦~】。
-
/**
-
* 对传过来的redis的key进行加锁
-
* 秒
-
* @param key 需要加锁的key
-
* @param expire 过期时间
-
* @return
-
*/
-
public Boolean lockEnable(String key, long expire) {
-
-
//这是将当前线程的名字置为key的value值,表明该锁被谁拿到
-
String keyValue = ().getName();
-
-
//1,这是StringRedisTemplate在set key的同时增加了过期时间,防止死锁。保证了原子性。
-
//2,setIfAbsent该方法如果该key不存在时候,设置值进去后,返回true;若是已经存在,则返回false;
-
Boolean aBoolean = ().setIfAbsent(key, keyValue, expire, );
-
Long surplusTime = (key, );
-
if (!aBoolean) {
-
("该线程【{}】加锁失败,该key【{}】剩余过期时间【{}】秒", keyValue, key, surplusTime);
-
return false;
-
}
-
("该线程【{}】加锁成功,该key【{}】剩余过期时间【{}】秒", keyValue, key, surplusTime);
-
return true;
-
}
4,解锁操作
解锁这边不可以单纯的删除redis的值,这里需要对key和value两个参数进行和redis里面存储的是否一致,防止误删别人的锁。避免其他错误的产生,由于我们设置锁的时候,锁和失效时间有原子性,故不存在加完锁后就宕机,导致死锁。
-
@Autowired
-
private DefaultRedisScript<Long> redisScript;
-
/**
-
* lua脚本
-
*
-
* @return
-
*/
-
@Bean
-
public DefaultRedisScript<Long> defaultRedisScript() {
-
DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
-
();
-
("if ('get', KEYS[1]) == KEYS[2] then return ('del', KEYS[1]) else return 0 end");
-
return defaultRedisScript;
-
}
同样的在完成该方法后一定要记得解锁,不要因为有过期时间就不释放锁,不要给自己埋坑,自己偷的懒早晚要还回来的。
-
/**
-
* 对传过来的redis的key进行解锁
-
* key和value不一致时,返回:【0】
-
* key和value不一致时,返回:【1】
-
* @param key
-
* @return
-
*/
-
public Boolean lockUnable(String key) {
-
String keyValue = ().getName();
-
//key和value不一致时,返回:【0】
-
//key和value不一致时,返回:【1】
-
Long execute = (redisScript, (key, keyValue));
-
if(execute != 1 ){
-
Boolean aBoolean = (key);
-
Long surplusTime = (key, );
-
("该key【{}】解锁失败,是否存在【{}】,剩余过期时间【{}】秒", key, aBoolean, surplusTime);
-
return false;
-
}
-
("该key【{}】解锁成功", key);
-
Boolean aBoolean = (key);
-
("该key是否存在【{}】",aBoolean);
-
return true;
-
}
5,分布式锁的测试类
由于数据敏感问题,我这边自己使用了自己创建的员工表进行模拟秒杀,效果是一致的,我们锁的多台服务器上面的同一个方法。
-
/**
-
* 1000个线程抢一条数据
-
*/
-
@Test
-
public void catchData() {
-
for (int i = 0; i <1000; i++) {
-
Thread thread = new Thread(() -> {
-
threadTest();
-
});
-
();
-
("thread" + i);
-
}
-
while (true){
-
-
}
-
}
-
/**
-
* 线程调用的测试方法
-
*/
-
public void threadTest() {
-
-
/**
-
* 对某一条数据的id进行加锁
-
*/
-
Boolean aBoolean = ("狄仁杰", 600);
-
-
-
if (!aBoolean) {
-
("线程【{}】没有拿到锁,结束流程",().getName());
-
return;
-
}
-
UpdateUserDTO updateUserDTO = new UpdateUserDTO();
-
("狄仁杰");
-
("UNABLE");
-
("15555406855");
-
(().getName());
-
Result<Boolean> result = (updateUserDTO);
-
if (!()) {
-
("线程【{}】更新数据失败",().getName());
-
return;
-
}
-
("线程【{}】更新数据成功",().getName());
-
-
-
/**
-
* 释放该条数据的锁
-
*/
-
Boolean aBoolean1 = ("狄仁杰");
-
("线程【{}】是否成功释放锁:【{}】",().getName(),aBoolean1);
-
}
5,分布式锁的测试结果展示
数据库之前的数据
跑完测试之后的数据
在打印的全部日志中,1000个线程只有一个线程拿到锁,其他线程全部失败。测试成功。
觉得写得你还满意,点下关注哈~如果有问题可以下面评论一起探讨下^~^