Redis源码分析之tryLock(long waitTime, long leaseTime, TimeUnit unit)

时间:2025-03-22 09:26:39

 RLock的tryLock方法

public boolean tryLock();
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

1、当调用

(0,30,)时

源码:

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = (waitTime);
        long current = ();
        final long threadId = ().getId();
        // 尝试获取锁,并返回剩余超时时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        // 如果ttl为空则说明锁未被其他客户端持有
        if (ttl == null) {
            return true;
        }
        // 检查是否超过等待时间 超过则返回false
        time -= (() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = ();
        // 当前线程进行订阅
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, )) {
            if (!(false)) {
                (new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= (() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
            // 在等待时间内 重复尝试获取锁 直到超过等待时间或成功获取锁
            while (true) {
                long currentTime = ();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= (() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = ();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, );
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, );
                }

                time -= (() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
} 
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        //注意最后一个参数
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, , threadId, RedisCommands.EVAL_LONG);
    (new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!()) {
                return;
            }

            Long ttlRemaining = ();
            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = (leaseTime);

    return (getName(), , command,
              "if (('exists', KEYS[1]) == 0) then " +
                  "('hset', KEYS[1], ARGV[2], 1); " +
                  "('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "('hincrby', KEYS[1], ARGV[2], 1); " +
                  "('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return ('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
private void scheduleExpirationRenewal(final long threadId) {
    if ((getEntryName())) {
        return;
    }
    //新建定时任务,每隔1/3过期时间则刷新过期时间
    Timeout task = ().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            
            RFuture<Boolean> future = (getName(), , RedisCommands.EVAL_BOOLEAN,
                    "if (('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return 0;",
                      Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
            
            (new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    (getEntryName());
                    if (!()) {
                        ("Can't update lock " + getName() + " expiration", ());
                        return;
                    }
                    
                    if (()) {
                        // reschedule itself
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }
    }, internalLockLeaseTime / 3, );
    // 如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回null
    if ((getEntryName(), task) != null) {
        ();
    }
}

 

lua脚本解释:

--检查key是否被占用了,如果没有则设置超时时间和唯一标识,初始化value=1
if (('exists', KEYS[1]) == 0) then
  ('hset', KEYS[1], ARGV[2], 1);
  ('pexpire', KEYS[1], ARGV[1]);
  return nil; 
end; 
--如果锁重入,需要判断锁的key field 都一致情况下 value 加一 
if (('hexists', KEYS[1], ARGV[2]) == 1) then 
  ('hincrby', KEYS[1], ARGV[2], 1);
  --锁重入重新设置超时时间  
  ('pexpire', KEYS[1], ARGV[1]); 
  return nil; 
end;
--返回剩余的过期时间
return ('pttl', KEYS[1]);

参数:

KEYS[1](getName()) :需要加锁的key

ARGV[1](internalLockLeaseTime) :锁的超时时间,防止死锁

ARGV[2](getLockName(threadId)) :锁的唯一标识, id(()) + “:” + threadId

如上lua脚本等同于在redis控制台执行

exists lbhTestLock

hset lbhTestLock 32131231:21 1

pexpire lbhTestLock 3000