源码仓库地址:git@gitee.com:chuangchuang-liu/hm-dingping.git
1、redission介绍
目前基于redis的setnx特性实现的自定义分布式锁仍存在的问题:
问题 | 描述 |
---|---|
重入问题 | 同一个线程无法多次获取统一把锁。当方法A成功获取锁后,调用方法B,方法B也要获取锁,此时由于锁是不可重入的,也就是被方法A占用着,此时就产生了死锁的问题 |
不可重试 | 自定义分布式锁无失败重试机制 |
超时释放 | 锁的超时释放虽然可以避免死锁问题,但确实也可能存在业务执行时间比较长的情况,那这种情况下就仍存在安全隐患问题 |
主从一致性 | 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。 |
什么是Redission?
Redission是一个用于Java的Redis客户端,它提供了丰富的特性,包括内存数据网格的功能。它支持同步/异步/RxJava/Reactive API,拥有超过50种基于Redis的Java对象和服务。Redission的使用非常简单,没有学习曲线,您不需要了解任何Redis命令就可以开始使用。(GitHub - redisson/redisson, Redisson官网)
Redission可以让Java应用更方便地访问和操作Redis数据存储,适合于需要高性能和高并发的应用场景。
2、快速开始
- 导入依赖
<!--redission-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
- Redission配置客户端
@Configuration
public class RedisConfig {
@Bean
public RedissonClient redisClient(){
Config config = new Config();
// 可以用"rediss://"来启用SSL连接
config.useSingleServer().setAddress("redis://192.168.224.128:6379").setPassword("123456");
return Redisson.create(config);
}
}
- 使用Redission分布式锁
@Resource
private RedissionClient redissonClient;
@Test
void testRedisson() throws Exception{
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
//判断获取锁成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}
}
}
3、redission可重入锁原理
3.1、原理介绍
在Lock锁中,他是借助于底层的一个voaltile的一个state变量来记录重入的状态的,比如当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1 ,如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。 、
而在redission中,也支持这种可重入锁原理,是通过redis的hash数据结构实现的。其中key表示这把锁是否存在,field判断锁是被哪个线程持有,value则记录锁被持有次数。
3.2、源码剖析
- 获取锁
其中各参数解释:
KEYS[1]:锁的名称
ARGV[1]:锁过期时间
ARGV[2]:id + “:” + threadId
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);"
判断锁是否存在
如果不存在,则设置当前线程标识,计数器+1;设置过期时间;
如果存在。做二次判断,判断锁的持有线程是不是自己?
如果是,计数器+1,重置锁的过期时间;
如果不是,获取锁失败,返回锁的剩余过期时间
- 释放锁
其中各参数解释:
KEYS[1]:锁的名称
KEYS[2]:订阅频道
ARGV[1]:是要发布的消息内容
ARGV[2]:锁过期时间
ARGV[3]:id + “:” + threadId
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;"
判断锁是不是当前线程?
不是==>直接返回
是==>计数器–
二次判断,判断计数器是否大于0
大于0==>重置锁过期时间
否则==>真正释放锁
4、redission锁重试和WatchDog机制
4.1、redission是如何解决不可重试的?
源码剖析:
用户调用tryLock方法时,指定waitTime最大等待时间
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
- 计算等待时间和获取当前时间:将用户指定的等待时间转换为毫秒,并记录方法调用时的当前时间。
- 尝试获取锁。如果ttl为空,则获取锁成功;否则,返回的是其他线程占用锁的剩余有效时间
- 检查剩余等待时间。如果time小于等于0,调用
acquireFailed
方法返回false- 订阅锁。通过
subscribe
方法订阅相关的锁。如果在剩余时间内未能订阅成功,处理取消订阅并调用acquireFailed
方法,返回false。- 循环等待锁释放消息。等待过程中会调用
tryAcquire
方法获取锁,如果获取成功返回true- 处理锁的ttl。如果ttl大于0,返回锁被其他线程占用的剩余过期时间(ttl)。更新剩余等待时间(time)。以time和ttl中较小的值继续等待再次尝试。
- 再次检查等于剩余等待时间。如果小于0,调用
acquireFailed
方法返回false- 循环结束后(要么获取锁成功,要么超过最大等待时间了),最终调用
unsubscribe
方法取消订阅
结论1:redission不是获取锁失败后立即进行重试,而是等待“一定时间”后再进行重试,节省了一定的CPU资源,对服务器性能有一定提升;
结论2:一定要采取调用tryLock
方法携带参数waitTime的重载方法,其他重载的tryLock方法底层是不具备重试机制的。
4.2、redission是如何解决锁超时释放的-看门狗机制?
自定义分布式锁仍存在的一个问题是:锁的超时释放虽然可以避免死锁问题,但确实也可能存在业务执行时间比较长的情况,那这种情况下业务还未执行完毕,锁就被释放了,存在一定的安全隐患。
源码剖析:
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining) {
// 开启任务更新过期时间
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
- 如果没有指定leaseTime,那么底层会默认传入commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()看门狗时间
- 在leasetime的1/3处时间,会创建一个任务
renewExpirationAsync
方法来异步地更新重置锁过期时间- 递归地调用自身来更新锁过期时间,直到业务处理完毕。
至此redission解决了因业务阻塞而导致锁提前释放的问题
业务执行完毕,释放锁源码剖析:
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
// 释放锁
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 取消锁失效时间更新重置任务
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
// 删除递归更新锁时间任务
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
当业务执行完毕且锁正常释放后,删除递归更新锁时间任务,避免redission一直递归创建任务更新锁过期时间
5、redission锁的MultiLock原理
为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例
此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。
为了解决该问题,redission的方案是去掉redis集群主从关系,每一个节点都是平等的。加锁逻辑是需要写入到每一个节点上才算加锁成功。这样,当某一台机器宕机了,这台机器的slave节点变为master节点,此时另一个线程趁虚而入,虽然可以正常写入,但其它机器仍会写入失败,最终结果仍是获取锁失败,从而保证了获取锁的可靠性。
MulitLock源码剖析:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// try {
// return tryLockAsync(waitTime, leaseTime, unit).get();
// } catch (ExecutionException e) {
// throw new IllegalStateException(e);
// }
long newLeaseTime = -1;
if (leaseTime != -1) {
if (waitTime == -1) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
newLeaseTime = unit.toMillis(waitTime)*2;
}
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
acquiredLocks.add(lock