1.绪论
为了解决并发问题,我们可以通过加锁的方式来保证数据的一致性,比如java中的synchronize关键字或者ReentrantLock,但是他们只能在同一jvm进程上加锁。现在的项目基本上都是分布式系统,如何对多个java实例进行加锁,这就需要用到分布式锁。分布式锁可以由多种实现方式,本文将要介绍的就是采用redis实现的方式。
2.分布式锁简单实现
2.1 基本原理
redis的分布式锁实现其实是由redis中的setnx命令来实现的,当线程1需要获取锁时,只需要调用setnx lockkey val,如果设置成功,便表示加锁成功。线程2到达时,同样调用setnx lockkey val方法,但是发现redis中已经存在lockkey时,便加锁失败。线程1执行完业务逻辑过后,便可以调用del key删除对应的key,这样便完成解锁。
setnx key val 表示当redis中不存在这个key便设置成功,否者设置失败。
3.如果set key成功,但是del key失败怎么办
3.1 问题
如果线程1在setnx key val调用成功过后,并且执行完业务逻辑,宕机了,此时相当于key便一直存在redis中,这样其他线程便一直不能获取到锁,导致死锁。
3.2 解决方案
我们可以在给key设置一个过期时间,这样利用redis过期策略进行兜底,就算线程1set key成功后宕机了,因为有过期时间,也能保证在一段时间后,这个key会被删除掉。
setnx key val
expire key timeout
可以看出可以使用上面两个命令,来对key设置过期时间,但是还是有问题,就是set key和expire key之间不是一个原子操作,这样会导致在加锁的时候会有并发问题。所以,我们相当redis中的lua脚本,来保证set key和expire key之间是一个原子操作。对于string字符串,redis提供了一个符合命令,来实现上面两个命令的功能:
set key val NX PX timeout
4 如何解决锁误删问题并实现可重入锁功能
4.1 问题描述
4.1.1 锁误删问题
1.线程1获取到锁,过后,由于设置了可以的超时时间,锁过期。
2.线程2来获取锁便能获取成功。
3.线程1执行完成,释放锁,调用del key,删除key成功。
4.线程3加锁,set key成功,但是此时线程2还在执行。
其实上面本质上就是线程释放了不是自己加的锁。
4.1.2 可重入锁
在java中的reentrantLock和syncronized都有可重入功能,即线程在获取到锁过后,能够再次获取当前锁,并且可冲入次数加1,如果释放锁时,可重入次数减1。
4.2 解决方案
既然上面线程释放了不是自己加的锁导致锁误删问题,我们可以在加锁是将线程id,记录到key中,这样,每次释放锁的时候,判断一下是否是本线程程加的锁,如果不是便直接返回,如果是便释放锁,就可以了。
而对于可重入问题,我们在记录线程id的时候,我们可以记录一下重入次数,每次重入的时候,重入加1,释放锁的时候,重入次数减1,减为0便删除key就可以了。
针对上面两点,我们可以采用hash结构来存储key,其中field为线程id,value为重入次数。
所以加锁变为:
//获取重入次数
local time = hget key threadId
//如果未加锁,设置重入次数为1
if(nil == time) then
hset key threadId 1
//如果已经加锁,设置重入次数自增
else
hincr key threadId
end
expire key 过期时间
释放锁为
//获取可重入次数
local time = hget key threadId
//如果没有threadId这个feild,表示该key已经过期,不用删除
if(nil = time) then
return 0
end
//表示当前只加锁了一次,删除锁
if (time < 2) then
del key
else
//否者重入次数减1
hincr key threadId -1
end
4.3 代码实现
4.3.1 java代码
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
private static final DefaultRedisScript<Long> LOCK_SCRIPT;
static {
LOCK_SCRIPT = new DefaultRedisScript<>();
LOCK_SCRIPT.setLocation(new ClassPathResource("lock.lua"));
LOCK_SCRIPT.setResultType(Long.class);
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 调用lua脚本
Long result = stringRedisTemplate.execute(LOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId(),
timeoutSec);
if (result == null) {
return false;
}
return 1 == result;
}
@Override
public void unlock() {
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
}
注意:java和lua的交互execute方法 /** * @param script lua脚本 * @param keys 需要操作的key,在lua脚本中用KEYS数组来接收,从1开始 * @param args 其他参数,在lua脚本中用ARGVS数组来接收,从1开始 * @param <T> * @return */ public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) { return this.scriptExecutor.execute(script, keys, args); }
4.3.2 加锁lua脚本
--1.获取参数列表
--加锁key在keys数组中
local lockKey = KEYS[1]
--线程id
local threadId = ARGV[1]
--超时时间
local timeout = ARGV[2]
--2.获取重入次数
local time = redis.call('hget', lockKey, threadId)
--3.如果未加锁,设置重入次数为1
if (nil == time) then
redis.call('hset', lockKey, threadId, 1)
--如果已经加锁,设置重入次数自增
else
redis.call('hincr', lockKey, threadId)
end
--4.设置过期时间
redis.call('expire', lockKey, timeout)
return 1
4.3.3 释放锁lua脚本
--1.获取参数列表
--加锁key在keys数组中
local lockKey = KEYS[1]
--线程id
local threadId = ARGV[1]
--2.获取重入次数
local time = redis.call('hget', lockKey, threadId)
--3.如果没有threadId这个feild,表示该key已经过期,不用删除
if(nil == time) then
return 1
end
--4.如果重入次数为1,表示当前只加锁了一次,删除锁
if (time < 2) then
redis.call('del', lockKey)
else
--5.否者重入次数减1
redis.call('hincr', lockKey,threadId)
end
return 1
lua脚本调用redis命令
redis.call('命令名称', 'key', '其它参数', ...)
5.未获取锁时进行重试实现
5.1 问题描述
前面实现的分布式锁时,当线程1获取锁时,线程2尝试获取锁失败,便会直接返回失败。我们如果要线程2在获取锁失败后,在一段时间内尝试获取锁,如果超所该时间,才返回失败,应该如何实现呢?
5.2 问题解决
在线程获取锁失败时,我们可以进行自旋直到获取锁成功为止,但是这样会消耗资源。所以我们可以通过redis的发布订阅机制,当线程获取锁失败过后,订阅加锁的key,然后阻塞。当其他线程释放锁的时候,会给我们发送一个通知,唤醒当前线程。
//订阅某个频道
SUBSCRIBE channel [channel ...]
5.3 redisson源码分析
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//获取线程等待时间
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
//尝试获取锁,如果获取失败,返回当前key还有多久过期
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
//等待时间扣减从当前方法进入到执行到这里花费时间
time -= System.currentTimeMillis() - current;
//如果小于0表示已经过期
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
//定于当前key
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
//真正的等待逻辑
while (true) {
long currentTime = System.currentTimeMillis();
//再次获取锁,如果失败得到锁还有多久过期
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
//如果key被删除,会pulish消息,唤醒当前线程
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
6.如何解决锁续期问题
6.1 问题描述
我们在实现分布式锁的时候,会设置过期时间,超过这个过期时间,这个key便会被自动删除。假设超过这个过期时间,当前业务逻辑还未执行完成,这样其他线程能拿到锁,会导致并发问题。
6.2 解决方案
我们可以设置一个线程,专门用来监听当前业务逻辑是否完成,如果未完成,便对key的时间进行续期,在redission实现的分布式锁中,这个线程被称作watchDog。我们来看看redisson中是如何实现的。
6.3 redisson源码分析
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//如果设置锁自动释放时间不等于-1,走正常逻辑
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//如果锁自动释放时间等于-1,开启watchDog,并且设置锁自动释放时间为30s
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
//启动watchDog线程,为当前线程进行锁续期
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
加锁的流程:可以看出redission实现的分布式锁的lua逻辑其实和我们上面是差不多的。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
锁续期流程:
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
//其实就是将当前的线程id封装成一个entry,并且加入到一个Map中,然后调用renewExpiration方法
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
private void renewExpiration() {
//从当前map中获取监听的entry
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//启动一个定时任务
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
//从map中取出需要续约的线程id
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//将线程id的过期时间重置为30s
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
//递归调用,一直监听存活的线程
renewExpiration();
}
});
}
//时间为10会触发一次定时任务
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
简单而言,其实就是启动一个定时线程,一直扫描存活的key,如果未过期,便重置其过期时间为30s。可以看出,当线程调用了unlock方法,便会停止锁续期。
6.4watchDog的几个问题
6.4.1 客户端宕机,watchDog是否会一直续期
答案是不会的,如果客户端宕机,证明当前jvm实例已经挂掉,所以执行watchDog的线程自然也挂掉了。
6.4.2 unLock失败,watchDog是否会一直续期
答案是不会的,在redisson的解锁方法中,会用到一个CompletionStage,它能保证无论删除key是否抛出异常,都能将当前线程id的锁续期任务从EXPIRATION_RENEWAL_MAP中移除。
7.redis主从一致性
7.1 问题描述
如果redis采用单机部署的话,redis宕机,导致整个服务都不可用。如果redis采用集群部署,但是会有主动不同步的问题,比如线程1将key加入到主节点,但是主节点还未将数据同步到从节点宕机,选举从节点为新的主节点,这个主节点并没有线程1设置的key,导致线程安全问题。\
7.2 联锁
7.2.1 实现原理
其实就是部署多个redis单击实例,当加锁的时候,向每个redis实例都发送setnx key val请求,当所有的redis实例都返回成功,才认为成功。
7.3 红锁
联锁是要求所有的锁加锁成功,才表示加锁成功,而红锁是只需要满足大于n/2+1个节点加锁成功,便成功。