目录
一. 分布式锁概述
- 一个靠谱的分布式锁需要具备的条件
1)独占: 同一时间内只允许一个线程获取到锁
2)高可用: 例如使用redis做分布式锁,不能因为一个节点挂了而造成获取释放锁失败的情况
3)防止死锁: 杜绝死锁,必须有超时控制,可撤销,有最终兜底跳出解决方案
4)不乱抢: 只允许自己的锁自己释放
5)重入性: 同一个节点,同一个线程获取到锁后,运行再次获取
- redis 实现分布式锁与zookeeper实现分布式锁的不同(如果要保证高可用使用zookeeper,如果保证高并发用redis)
zookeeper实现的锁是cp: zk基于有且仅有一个zonde节点实现,加锁成功就是建立一个节点,使用完成后自己删除,zk在同步数据时所有节点都同步成功后才返回成功,所以说是cp
redis集群版是ap(高可用+分区容错性):基于key是否存在+lua脚本实现,官网推荐redlock,redis同步是异步进行的先响应成功,通过异步线程去同步数据,所以说是ap,进而引出一个问题,在同步前主节点宕机,后选举出的主节点中没有锁数据,也就是数据不一致问题,redlock中可以通过多多节点同时上锁,都上锁成功才返回成功来解决这个问题
- redis实现锁的功能主要用到的命令
- 注意点 setnx+expire两条命令是非原子性的,不安全
二. redis 锁基础版示例
- 项目中引入依赖,配置连接redis服务器,配置获取RedisTemplate
<!--SpringBoot与Redis整合依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!-- jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
- 购物需求分析redis分布式锁,将商品数据存储到reids,通过商品id,获取reids中商品减库存
import org.redisson.Redisson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import redis.clients.jedis.Jedis;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private Jedis jedis;
@Autowired
private Redisson redisson;
@GetMapping("/buyGoods")
public String lockRedis(String goodsId) throws Exception {
//1.获取唯一id,释放锁时根据该id判断释放的是否是对应的
String value = UUID.randomUUID().toString();
try {
//2.获取锁,并判断是否获取成功,使用setIfAbsent()方法,对应redis中的setnx命令,并指定锁的失效时间
//不用get()+set()原因是这两个方法不能保证原子性
boolean b = stringRedisTemplate.opsForValue().setIfAbsent("goods:", value, 10L, TimeUnit.SECONDS);
if (!b) {
return "获取锁失败";
}
//3.到此处说明获取锁成功,执行正常逻辑,获取商品数据,对商品进行减库操作
String result = stringRedisTemplate.opsForValue().get("goods:" + goodsId);
int goodsNum = null == result ? 0 : Integer.parseInt(result);
if (0 == goodsNum) {
return "商品以售空";
}
//减库,并写回redis
int num = goodsNum - 1;
stringRedisTemplate.opsForValue().set("goods:" + goodsId, String.valueOf(num));
return "购买成功";
//4.防止代码异常再finally中释放锁
} finally {
//5.防止出现异常等在finally中释放锁
//防止a线程释放b线程锁的问题,通过唯一id判断
/*if (stringRedisTemplate.opsForValue().get("goods:").equalsIgnoreCase(value)) {
stringRedisTemplate.delete("goods:");
}*/
//6.在第5步中分别去获取指定key的值然后删除该key的值,代表释放指定锁
//分为两步,不是原子性的,若在此时不是一个客户端,会误删,优化: 使用脚本
String script ="if redis.call('get',KEYS[1]) == AVG[1] " +
"then" +
" return redis.call('del',KEYS[1]) " +
"else" +
" return 0 end";
try{
Object result = jedis.eval(script, Collections.singletonList("goods:"), Collections.singletonList(value));
if ("1".equals(result.toString())) {
System.out.println("del lock success");
}else {
System.out.println("del lock error");
}
}finally {
if (null != jedis) {
jedis.close();
}
}
}
}
}
- 上述代码第六步中的lua脚本转换为java就是
- 配置连接池,配置jedis
/**
* 配置redis连接池
*
* @return
*/
@Bean
public JedisPoolConfig jedisPoolConfig() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
//最大空闲数,默认8
jedisPoolConfig.setMaxIdle(8);
//最大连接数,默认8
jedisPoolConfig.setMaxTotal(8);
//建立连接最大等待时间,单位毫秒,默认-1,永不超时不建议使用
jedisPoolConfig.setMaxWaitMillis(10000);
//逐出连接最小空闲时间(默认1800000毫秒)
//可根据自身业务决定,一般默认值即可,也可以考虑使用下方JeidsPoolConfig中的配置。
jedisPoolConfig.setMinEvictableIdleTimeMillis(1800000);
//每次逐出检查时,逐出的最大数据,如果为负就是:1/abs(n),默认3,
//可根据自身应用连接数进行微调,如果设置为 -1,就是对所有连接做空闲监测。
jedisPoolConfig.setNumTestsPerEvictionRun(3);
//逐出扫描间隔时间单位毫秒,如果为负,则不允许逐出现场,默认-1
//建议设置,周期自行选择,也可以默认也可以使用下方JedisPoolConfig 中的配置。
jedisPoolConfig.setTimeBetweenEvictionRunsMillis(-1);
//是否从翅中取出连接进行检查,如果检查失败,则从池中去除并新取一个
//默认false,业务量很大时候建议设置为false,减少一次ping的开销。
jedisPoolConfig.setTestOnBorrow(false);
//在空闲时检查有效性,默认false,建议开启
jedisPoolConfig.setTestWhileIdle(true);
return jedisPoolConfig;
}
@Bean
public Jedis jedis(JedisPoolConfig jedisPoolConfig) {
JedisPool jedisPool = new JedisPool(jedisPoolConfig, "411.100.63.107", 16379);
return jedisPool.getResource();
}
- 对上述代码解释,通过向redis中存储一个指定key代表锁,存储成功说明加锁成功,失败说明加锁失败
1)在第一步中获取一个唯一id,在释放锁时通过这个唯一id防止a线程释放了b线程锁的问题
2)在第二步中使用 setIfAbsent(key,value) 代替"get()+setNx()"尝试添加锁,保证原子性
3)在第三步中对库存进行减少操作,并更新库存
4)在第四步中使用finally,防止在释放锁以前代码出现异常锁无法释放
5)在第五步中(有问题,所以注释掉了使用第六步)获取锁,并判断value释放相等释放锁,但是获取锁,释放锁分为两步,不能保证原子性
6)在第六步中使用脚本,防止第五步释放锁不能保证原子性问题
- 提出上面代码中还存在的问题
1)上面设置了过期时间为10,你怎么确定在这个时间内业务逻辑能够正常执行完毕?
2)如果当前架构中使用的是redis集群,redis集群情况下,采用的异步通知模式,一个节点接收请求,然后通过主节点去异步通知其它节点,在获取锁(想reids中存储指定key)或释放锁(删除redis指定key),例如获取锁,一个节点接收到插入请求,在插入成功后,通知其它节点前,主节点宕机,在没有被通知到的子节点中重新选出了一个主节点,这个主节点中并没有这把锁,也不回进行通知,这个时间进来另外一个获取锁的请求打到了还未被通知的节点上,这时候发现没有,就会出现锁不住的情况
- 解决上面的问题:
三. redis 锁进阶 Redlock
- 复习redis集群下与ZooKeeper集群下各自的同步流程
1)redis集群AP: 在集群环境中多个redis 分为master主库与Salve从库,当Salve启动后会连接到master主库,并发送一个sync命令,master接收到该命令后台启动一个进程,收集接收到的操作数据指令缓存为快照文件,当缓存完毕后,将这个文件发送给所有连接到该master的Salve从库,Salve将文件接收保存到磁盘上,然后加载到内存中,后续master主库接收到的修改数据指令都会通过这个后台进程发送给Salve,是异步的
2)Zookeeper集群CP: Zookeeper集群下也是分为master与salve节点,假设向salve拿到存储请求后,会将信息同步给主节点,主节点通知将数据同步给其它节点,全部同步成功后,才会响应成功,假设当主节点宕机时,集群会重新选举,在选举期间整个zookeepre是不可用的,只有轩主成功后才可以继续使用
3)总结: ZooKeeper为了保证数据一致性牺牲了可靠性,而redis集群是保证高可用,在选择时根据需求,是想要高一致性,还是高可用,例如金额相关对一致性要求极为严格的可以选择ZooKeeper,例如抢购等保证高可用的可以选择redis
- 使用 Redisson 中封装好的锁也就是Redlock
- 集群环境下增加引入Redisson 依赖,配置Redisson,做分布式锁
<!-- redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.4</version>
</dependency>
- 配置redisson注入到容器
/**
* 单机版
* @return
*/
@Bean
public Redisson redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://443.100.63.107:16379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
/**
* 集群版
* @return
*/
@Bean
public RedissonClient getRedisson() {
//redi集群地址
String cluster="10.10.1.1:7000,10.10.1.1:7001,10.10.1.1:7002,10.10.1.1:7003,10.10.1.1:7004,10.10.1.1:7005";
String[] nodes = cluster.split(",");
//redisson版本是3.5,集群的ip前面要加上“redis://”,不然会报错,3.2版本可不加
for (int i = 0; i < nodes.length; i++) {
nodes[i] = "redis://" + nodes[i];
}
Config config = new Config();
//SentinelServersConfig serverConfig = config.useSentinelServers()
//useSentinelServers() 与 useClusterServers() 前者要指定masterName
//调用 setMasterName("masterName")
config.useClusterServers() //这是用的集群server
.addNodeAddress(nodes)
.setScanInterval(2000) //设置集群状态扫描时间
.setPassword("password")
.setTimeout(3000)
.setMasterConnectionPoolSize(8)
.setSlaveConnectionPoolSize(8)
.setSlaveConnectionMinimumIdleSize(1)
.setMasterConnectionMinimumIdleSize(1);;
RedissonClient redisson = Redisson.create(config);
//可通过打印redisson.getConfig().toJSON().toString()来检测是否配置成功
return redisson;
}
- 使用示例
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import redis.clients.jedis.Jedis;
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private Redisson redisson;
@GetMapping("/buyGoods")
public String buyGoods(String goodsId) throws Exception {
//1.通过 Redisson 对指定key加锁(注意点相同锁key相同)
RLock redissonLock = redisson.getLock("goods:");
//2.加锁
redissonLock.lock();
//3.加锁并指定失效时间
//redissonLock.lock(10,TimeUnit.SECONDS);
//4.tryLock(waitTime, leaseTime, 时间单位)
//waitTime: 抢锁时等待时间,正常情况下3秒
//leaseTime: 获取到锁后的锁失效时间,正常情况下300秒
redissonLock.tryLock(3, 300, TimeUnit.SECONDS);
try {
//4.获取到锁的线程获取商品对商品数量进行减少操作
String result = stringRedisTemplate.opsForValue().get("goods:" + goodsId);
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber <= 0) {
return "商品已经售罄";
}
int realNumber = goodsNumber - 1;
//5.更新商品数量
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
return "成功秒杀商品,此时还剩余:" + realNumber + "件";
} finally {
//6.防止发生异常通过finally释放锁资源
//7.redissonLock.isLocked()判断当前是否持有锁
//redissonLock.isHeldByCurrentThread()//判断当前持有的锁是否是当前线程下的
if (redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()) {
redissonLock.unlock();
}
}
}
}
- 方法摘要
Redisson getLock(“lockKey”) 对指定key加锁
lock() 获取锁 / lock(10,TimeUnit.SECONDS) 获取锁并指定失效时间
tryLock(waitTime, leaseTime, 时间单位)
isLocked() 判断是否还持有锁
isHeldByCurrentThread() 判断持有的锁是否是当前线程下的
unlock() 释放锁
四. Redlock 分析
- 对 redlock的解释: 多个服务器间,保证同一时间段内,只有一个请求,防止数据出现并发安全问题,该锁在java中通过Redisson实现,使用时需要引入Redisson依赖,主要是针对上面使用setNx+指定key做锁时,出现的锁超时问题,与集群环境下异步同步,主节点宕机无法锁无法同步问题
- 中文网
- 设计理念:(集群redis计算公式: number= 2*宕机台数+1, 宕机多少台后不影响正常使用)
解决集群环境master宕机数据不一致锁不住的问题
- 首先上面提到过集群架构redis台数计算公式:2*允许宕机台数+1=不影响正常使用的机器总台数
- 上面设计理念中提到过: 集群环境中使用Redlock解决数据不一致情况,舍弃主从节点架构,通过多个节点去获取锁,获取锁的个数是n/2+1(n表示允许宕机台数)=至少获取锁个数,才表示获取锁成功
- 使用示例: 项目中引入Redisson依赖
- redis集群环境不同服务器地址配置到不同的redisson中,注入到容器
@Bean
public RedissonClient redissonClient1() {
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress("redis://第一台redis地址")
.setTimeout(3000)
.setConnectionPoolSize(64)
.setConnectionMinimumIdleSize(24);
serverConfig.setPassword("Password");
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2() {
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress("redis://第二台redis地址")
.setTimeout(3000)
.setConnectionPoolSize(64)
.setConnectionMinimumIdleSize(24);
serverConfig.setPassword("Password");
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3() {
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress("redis://第三台redis地址")
.setTimeout(3000)
.setConnectionPoolSize(64)
.setConnectionMinimumIdleSize(24);
serverConfig.setPassword("Password");
return Redisson.create(config);
}
- 使用示例
import lombok.extern.slf4j.Slf4j;
import org.redisson.RedissonRedLock;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
@RestController
@Slf4j
public class RedLockController {
public static final String CACHE_KEY_REDLOCK = "ATGUIGU_REDLOCK";
@Autowired
RedissonClient redissonClient1;
@Autowired
RedissonClient redissonClient2;
@Autowired
RedissonClient redissonClient3;
@GetMapping(value = "/redlock")
public void getlock() {
//1.通过多个redis节点获取锁
RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK);
RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK);
RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK);
//三个节点都获取到锁,才表示获取锁成功
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLockBoolean;
try {
//2.tryLock()尝试获取锁
//waitTime 抢锁的等待时间,正常情况下 等3秒
//leaseTime就是redis key的续时时间,正常情况下5分钟300秒。
isLockBoolean = redLock.tryLock(3, 300, TimeUnit.SECONDS);
log.info("线程{},是否拿到锁:{} ",Thread.currentThread().getName(),isLockBoolean);
if (isLockBoolean) {
System.out.println(Thread.currentThread().getName()+"\t"+"---come in biz");
//业务逻辑,忙10分钟
try { TimeUnit.MINUTES.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
}else{
System.out.println("获取锁失败");
}
} catch (Exception e) {
log.error("redlock exception ",e);
} finally {
//无论如何, 最后都要解锁
//由于锁的可重入性,加几次锁就要释放几次,具体查看锁的可重入性分析
redLock.unlock();
redLock.unlock();
redLock.unlock();
}
}
}
- 解释上述代码:
1)不同redis服务器配置到不同的redisson中
2)获取锁时对多个redisson同时去获取,都获取到才表示获取锁成功,也就是分别对三个redis节点添加代表锁的key,都添加成功才表示获取锁成功,这样在后续操作中有一台机器如果宕机,不会影响到锁
锁的定时续期
- 怎么对锁设置超时时间,指定时间内无法释放怎么办
- watchdog看门狗: 在redisson中提供了一个额外的守护线程,定期检查主线程是否还持有锁,如果有则延长锁的过期时间,源码中设置检查时间为(每1/3锁的时间检查一次)
watchdog源码分析
- 查看Redisson 中的 lock()获取锁方法发现Redisson中实现了 "java.util.concurrent.locks " JUC 并发包下的接口,查看在Redisson中对该接口的实现类RedissonLock,发现
leaseTime 默认为 -1
public void lock() {
try {
//继续查看该lock方法
this.lock(-1L, (TimeUnit)null, false);
} catch (InterruptedException var2) {
throw new IllegalStateException();
}
}
- 继续向下查看lock方法 ,在该方法中调用了tryAcquire(-1L, leaseTime, unit, threadId),在不设置leaseTime的情况下默认传递了-1
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//查看tryAcquire()方法
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
try {
while(true) {
ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
try {
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var13) {
if (interruptibly) {
throw var13;
}
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
((RedissonLockEntry)future.getNow()).getLatch().acquire();
} else {
((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
}
- 查看tryAcquire()方法最终会执行到tryAcquireAsync(),在该方法内部会判断传递的leaseTime是否等于-1(默认情况下-1,也就是不设置情况下),如果传递了说明设置了超时时间, 如果未传递没有设置超时时间执行tryLockInnerAsync()时设置了一个lockWatchdogTimeout = 30000L,了解到默认不设置超时时间情况下watchdog超时时间为30秒,并且该方法执行会返回一个RFuture,这是一个FutureTask,通过该FutureTask实现延时续命
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//1.在我们获取锁时如果传递了leaseTime 不等于-1,走该流程
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//2.如果未传递leaseTime走该流程,执行getLockWatchdogTimeout()拿了一个超时时间30000L
//并且该方法执行会返回一个RFuture<Long>,这是一个FutureTask,通过该FutureTask实现延时续命
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//3.通过返回的RFuture执行 onComplete()
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
//4.内部执行scheduleExpirationRenewal()方法,插入了一个检查过期时间的定时任务线程
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
- 查看获取检查线程的scheduleExpirationRenewal()方法,在该方法中最先会执行renewExpiration(),查看该方法
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
//首先执行的该方法
this.renewExpiration();
}
}
//renewExpiration()方法插入执行定时任务,定时检查超时时间,执行方法内部的
private void renewExpiration() {
RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
//开启一个线程
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
//2.续时,每次续时30秒
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
//1.internalLockLeaseTime /3l: 也就是传进来的leaseTime超时时间默认30秒/3l=10秒
//也就是当前开启的检查超时时间的定时任务执行的定时时间,默认10秒,默认情况下相当于每10秒中会执行一次
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
- watchdog总结: 默认情况下获取到锁成功后持有锁时间为30秒,每十秒扫描一次判断是否还持有锁,如果持有续时,每次默认续时30秒,否则传递leaseTime情况下每leaseTime/3检查一次,每次续时leaseTime秒
1)查看Redisson 中的 lock()获取锁方法发现Redisson实现了"java.util.concurrent.locks " JUC 并发包下的接口实现类RedissonLock,查看lock源码
2)在lock()方法中调用了tryAcquire(-1L, leaseTime, unit, threadId),发现在不设置leaseTime的情况下默认传递了-1
3)查看tryAcquire()方法,在该方法中首先会判断leaseTime是不是等于-1,如果等于-1,执行tryLockInnerAsync()时拿了一个lockWatchdogTimeout = 30000L ,并且该方法执行会返回一个RFuture,这是一个FutureTask,到这里确认到锁持有锁的默认时间为30秒
4)上面拿到了一个RFuture,通过返回的RFuture执行 onComplete(),内部调用了scheduleExpirationRenewal()方法,插入了一个检查过期时间的定时任务线程
5)查看scheduleExpirationRenewal()方法,内部调用renewExpiration(),该方法中内部run了一个线程,该线程是个定时任务,定时时间为"internalLockLeaseTime / 3L",internalLockLeaseTime 也就是刚刚传递下来的30秒,也就是默认情况下每10秒执行一次该定时任务,检查是否还持有锁,如果持有续时30秒
锁的可重入性分析
- 根据上面分析的在lock()方法中会调用到tryAcquireAsync(),在该方法内部会执行tryLockInnerAsync(),该方法中有一段lua脚本,
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil;" +
" end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
- 对lua脚本解释:
1)首先判断添加的key是否存在"redis.call(‘exists’, KEYS[1])" 等于0为不存在
2)如果不存在进行加锁"redis.call(‘hincrby’, KEYS[1], ARGV[2], 1)", 其中"KEYS[1]"也就是加锁的key,"ARGV[1]"表示加锁的客户端id,后面的"1"为加锁次数
3)"redis.call(‘pexpire’, KEYS[1], ARGV[1]); " 表示设置过期时间,前面通过看门狗了解到默认30秒
4) 如果判断锁已经存在执行下一个if,会判断是否是当前线程,如果是当前线程会增加加锁次数
5)如果锁已经存在并且不是当前线程,会返回过期时间ttl
- 了解到锁的可重入性,在我们加锁时,加几次锁就要释放几次(前面多客户端分别加锁情况下)
释放锁分析
- 查看unlock()方法内部调用了unlockAsync(),首先发现释放锁是异步执行的,在该方法中通过执行unlockInnerAsync()去释放锁,并拿到一个RFuture可以理解为FutureTask,
- 通过拿到的FutureTask中执行cancelExpirationRenewal() 取消watchdog获取锁是插入的定时续期的定时任务
public void unlock() {
try {
//1.查看该方法中的unlockAsync()
this.get(this.unlockAsync(Thread.currentThread().getId()));
} catch (RedisException var2) {
if (var2.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)var2.getCause();
} else {
throw var2;
}
}
}
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
//1.unlockInnerAsync()释放锁逻辑
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
//2.取消watchdog看门狗续期的定时任务
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}
- 查看 unlockInnerAsync(),该方法内部又是一堆lua脚本,通过脚本删除redis中的key释放锁,重点里面有一个"local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1)",这个就是重入锁释放时,累减操作,所以重入锁时加了几次,就需要释放几次
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
}