RLock的tryLock方法
public boolean tryLock();
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
1、当调用
(0,30,)时
源码:
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = (waitTime);
long current = ();
final long threadId = ().getId();
// 尝试获取锁,并返回剩余超时时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
// 如果ttl为空则说明锁未被其他客户端持有
if (ttl == null) {
return true;
}
// 检查是否超过等待时间 超过则返回false
time -= (() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = ();
// 当前线程进行订阅
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!await(subscribeFuture, time, )) {
if (!(false)) {
(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (()) {
unsubscribe(subscribeFuture, threadId);
}
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= (() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// 在等待时间内 重复尝试获取锁 直到超过等待时间或成功获取锁
while (true) {
long currentTime = ();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= (() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
currentTime = ();
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, );
} else {
getEntry(threadId).getLatch().tryAcquire(time, );
}
time -= (() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
//注意最后一个参数
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, , threadId, RedisCommands.EVAL_LONG);
(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!()) {
return;
}
Long ttlRemaining = ();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = (leaseTime);
return (getName(), , command,
"if (('exists', KEYS[1]) == 0) then " +
"('hset', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (('hexists', KEYS[1], ARGV[2]) == 1) then " +
"('hincrby', KEYS[1], ARGV[2], 1); " +
"('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return ('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
private void scheduleExpirationRenewal(final long threadId) { if ((getEntryName())) { return; } //新建定时任务,每隔1/3过期时间则刷新过期时间 Timeout task = ().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { RFuture<Boolean> future = (getName(), , RedisCommands.EVAL_BOOLEAN, "if (('hexists', KEYS[1], ARGV[2]) == 1) then " + "('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); (new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { (getEntryName()); if (!()) { ("Can't update lock " + getName() + " expiration", ()); return; } if (()) { // reschedule itself scheduleExpirationRenewal(threadId); } } }); } }, internalLockLeaseTime / 3, ); // 如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回null if ((getEntryName(), task) != null) { (); } }
lua脚本解释:
--检查key是否被占用了,如果没有则设置超时时间和唯一标识,初始化value=1
if (('exists', KEYS[1]) == 0) then
('hset', KEYS[1], ARGV[2], 1);
('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
--如果锁重入,需要判断锁的key field 都一致情况下 value 加一
if (('hexists', KEYS[1], ARGV[2]) == 1) then
('hincrby', KEYS[1], ARGV[2], 1);
--锁重入重新设置超时时间
('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
--返回剩余的过期时间
return ('pttl', KEYS[1]);
参数:
KEYS[1](getName()) :需要加锁的key
ARGV[1](internalLockLeaseTime) :锁的超时时间,防止死锁
ARGV[2](getLockName(threadId)) :锁的唯一标识, id(()) + “:” + threadId
如上lua脚本等同于在redis控制台执行
exists lbhTestLock
hset lbhTestLock 32131231:21 1
pexpire lbhTestLock 3000