In order to acquire the lock, the client performs the following operations:

1、It gets the current time in milliseconds.

2、It tries to acquire the lock in all the N instances sequentially, using the same key name and random value in all the instances. During step 2, when setting the lock in each instance, the client uses a timeout which is small compared to the total lock auto-release time in order to acquire it. For example if the auto-release time is 10 seconds, the timeout could be in the ~ 5-50 milliseconds range. This prevents the client from remaining blocked for a long time trying to talk with a Redis node which is down: if an instance is not available, we should try to talk with the next instance ASAP.

3、The client computes how much time elapsed in order to acquire the lock, by subtracting from the current time the timestamp obtained in step 1. If and only if the client was able to acquire the lock in the majority of the instances (at least 3), and the total time elapsed to acquire the lock is less than lock validity time, the lock is considered to be acquired.

4、If the lock was acquired, its validity time is considered to be the initial validity time minus the time elapsed, as computed in step 3.

5、If the client failed to acquire the lock for some reason (either it was not able to lock N/2+1 instances or the validity time is negative), it will try to unlock all the instances (even the instances it believed it was not able to lock).



3、源码中使用到的 Redis 命令

SETNX key value (SET if Not eXists):当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。详见:SETNX commond

GETSET key value:将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。详见:GETSET commond

GET key:返回 key 所关联的字符串值,如果 key 不存在那么返回 nil 。详见:GET Commond

DEL key [KEY …]:删除给定的一个或多个 key ,不存在的 key 会被忽略,返回实际删除的key的个数(integer)。详见:DEL Commond

HSET key field value:给一个key 设置一个{field=value}的组合值,如果key没有就直接赋值并返回1,如果field已有,那么就更新value的值,并返回0.详见:HSET Commond

HEXISTS key field:当key 中存储着field的时候返回1,如果key或者field至少有一个不存在返回0。详见HEXISTS Commond

HINCRBY key field increment:将存储在 key 中的哈希(Hash)对象中的指定字段 field 的值加上增量 increment。如果键
key 不存在,一个保存了哈希对象的新建将被创建。如果字段 field 不存在,在进行当前操作前,其将被创建,且对应的值被置为 0。返回值是增量之后的值。详见:HINCRBY Commond

PEXPIRE key milliseconds:设置存活时间,单位是毫秒。expire操作单位是秒。详见:PEXPIRE Commond

PUBLISH channel message:向channel post一个message内容的消息,返回接收消息的客户端数。详见PUBLISH Commond


public class RedisLockTest {

    public static void main(String[] args) throws InterruptedException {
        Config config = new Config();
        ().addSentinelAddress("", "").setMasterName("master")
        RedissonClient redissonClient = (config);

        RLock lock = ("LOCKER_PREFIX" + "test_lock");
        try {
            boolean isLock = ();
            //            isLock = (100, 1000, );
            if (isLock) {
        } catch (Exception e) {
        } finally {



lock.tryLock() -> tryAcquireOnceAsync - tryLockInnerAsync
lock.tryLock(100, 1000, ) ->tryLock(long waitTime, long leaseTime, TimeUnit unit)

二、 源码分析-无参


private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, , threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        (new FutureListener<Boolean>() {
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!()) {

                Boolean ttlRemaining = ();
                // lock acquired
                if (ttlRemaining) {
        return ttlRemainingFuture;
1.1 、
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = (leaseTime);

        return (getName(), , command,
                  "if (('exists', KEYS[1]) == 0) then " + //如果锁名称不存在
                      "('hset', KEYS[1], ARGV[2], 1); " +//则向redis中添加一个key为test_lock的set,并且向set中添加一个field为线程id,值=1的键值对,表示此线程的重入次数为1
                      "('pexpire', KEYS[1], ARGV[1]); " +//设置set的过期时间,防止当前服务器出问题后导致死锁,return nil; end;返回nil 结束
                      "return nil; " +
                  "end; " +
                  "if (('hexists', KEYS[1], ARGV[2]) == 1) then " +//如果锁是存在的,检测是否是当前线程持有锁,如果是当前线程持有锁
                      "('hincrby', KEYS[1], ARGV[2], 1); " +//则将该线程重入的次数++
                      "('pexpire', KEYS[1], ARGV[1]); " +//并且重新设置该锁的有效时间
                      "return nil; " + //返回nil,结束
                  "end; " +
                  "return ('pttl', KEYS[1]);", //锁存在, 但不是当前线程加的锁,则返回锁的过期时间
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

KEYS[1] 表示的是 getName() ,代表的是锁名 test_lock
ARGV[1] 表示的是 internalLockLeaseTime 默认值是30s
ARGV[2] 表示的是 getLockName(threadId) 代表的是 id:threadId 用锁对象id+线程id, 表示当前访问线程,用于区分不同服务器上的线程.

<div class="se-preview-section-delimiter"></div>

#### 1.2、

<div class="se-preview-section-delimiter"></div>

private void scheduleExpirationRenewal(final long threadId) {
        if ((getEntryName())) {

        Timeout task = ().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {

                RFuture<Boolean> future = (getName(), , RedisCommands.EVAL_BOOLEAN,
                        "if (('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

                (new FutureListener<Boolean>() {
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        if (!()) {
                            ("Can't update lock " + getName() + " expiration", ());

                        if (()) {
                            // reschedule itself
        }, internalLockLeaseTime / 3, );

        if ((getEntryName(), task) != null) {

三、 源码分析-过期时间


    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = (waitTime);
        long current = ();
        final long threadId = ().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;

        time -= (() - current);
        if (time <= 0) {
            return false;

        current = ();
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, )) {
            if (!(false)) {
                (new FutureListener<RedissonLockEntry>() {
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (()) {
                            unsubscribe(subscribeFuture, threadId);
            return false;

        try {
            time -= (() - current);
            if (time <= 0) {
                return false;

            while (true) {
                long currentTime = ();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;

                time -= (() - currentTime);
                if (time <= 0) {
                    return false;

                // waiting for message
                currentTime = ();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, );
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, );

                time -= (() - currentTime);
                if (time <= 0) {
                    return false;
        } finally {
            unsubscribe(subscribeFuture, threadId);
//        return get(tryLockAsync(waitTime, leaseTime, unit));
1.1 tryAcquire

tryLock -> tryAcquire -> tryAcquireAsync -> tryAcquireAsync -> tryLockInnerAsync

1.2 subscribe

tryLock -> subscribe(threadId) -> (final String entryName, final String channelName, final ConnectionManager connectionManager)

public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) {
        final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
        final AsyncSemaphore semaphore = (channelName);
        final RPromise<E> newPromise = new PromiseDelegator<E>(connectionManager.<E>newPromise()) {
            public boolean cancel(boolean mayInterruptIfRunning) {
                return (());

        Runnable listener = new Runnable() {

            public void run() {
                E entry = (entryName);
                if (entry != null) {
                    ().addListener(new TransferListener<E>(newPromise));

                E value = createEntry(newPromise);

                E oldValue = (entryName, value);
                if (oldValue != null) {
                    ().addListener(new TransferListener<E>(newPromise));
                // 1.2.1
                RedisPubSubListener<Object> listener = createListener(channelName, value);
                // 1.2.2
                (, channelName, listener, semaphore);

        return newPromise;

tryLock -> subscribe(threadId) -> ->

private RedisPubSubListener<Object> createListener(final String channelName, final E value) {
        RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {

            public void onMessage(String channel, Object message) {
                if (!(channel)) {

                PublishSubscribe.this.onMessage(value, (Long)message);

            public boolean onStatus(PubSubType type, String channel) {
                if (!(channel)) {
                    return false;

                if (type == ) {
                    return true;
                return false;

        return listener;

tryLock -> subscribe(threadId) -> -> ->

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long unlockMessage = 0L;

    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);

    protected void onMessage(RedissonLockEntry value, Long message) {
        if ((unlockMessage)) {
            // 释放一个许可,唤醒等待的().tryAcquire去再次尝试获取锁。
            // 如果entry还有其他Listeners回调,也唤醒执行。
            while (true) {
                Runnable runnableToExecute = null;
                synchronized (value) {
                    Runnable runnable = ().poll();
                    if (runnable != null) {
                        if (().tryAcquire()) {
                            runnableToExecute = runnable;
                        } else {

                if (runnableToExecute != null) {
                } else {

tryLock -> subscribe(threadId) -> ->

public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener, AsyncSemaphore semaphore) {
        RPromise<PubSubConnectionEntry> promise = newPromise();
        subscribe(codec, channelName, listener, promise, , semaphore);
        return promise;

tryLock -> subscribe(threadId) -> -> -> (final Codec codec, final String channelName, final RedisPubSubListener

private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener,
            final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) {
        final PubSubConnectionEntry connEntry = (channelName);
        if (connEntry != null) {
            (channelName, listener);
            (channelName, type).addListener(new FutureListener<Void>() {
                public void operationComplete(Future<Void> future) throws Exception {

        (new Runnable() {

            public void run() {
                if (()) {

                final PubSubConnectionEntry freeEntry = ();
                if (freeEntry == null) {
                    connect(codec, channelName, listener, promise, type, lock);

                int remainFreeAmount = ();
                if (remainFreeAmount == -1) {
                    throw new IllegalStateException();

                final PubSubConnectionEntry oldEntry = (channelName, freeEntry);
                if (oldEntry != null) {

                    (channelName, listener);
                    (channelName, type).addListener(new FutureListener<Void>() {
                        public void operationComplete(Future<Void> future) throws Exception {

                if (remainFreeAmount == 0) {

                (channelName, listener);
                (channelName, type).addListener(new FutureListener<Void>() {
                    public void operationComplete(Future<Void> future) throws Exception {

                if ( == type) {
                    (codec, channelName);
                } else {
                    (codec, channelName);


三、RedissonLock解锁 unlock源码

    public void unlock() {
        Boolean opStatus = (getName(), , RedisCommands.EVAL_BOOLEAN,
                        "if (('exists', KEYS[1]) == 0) then " +//如果锁已经不存在(可能是因为过期导致不存在,也可能是因为已经解锁)
                            "('publish', KEYS[2], ARGV[1]); " +//则发布锁解除的消息
                            "return 1; " + //返回1结束
                        "end;" +
                        "if (('hexists', KEYS[1], ARGV[3]) == 0) then " + //如果锁存在,但是若果当前线程不是加锁的线
                            "return nil;" + //则直接返回nil 结束
                        "end; " +
                        "local counter = ('hincrby', KEYS[1], ARGV[3], -1); " + //如果是锁是当前线程所添加,定义变量counter,表示当前线程的重入次数-1,即直接将重入次数-1
                        "if (counter > 0) then " + //如果重入次数大于0,表示该线程还有其他任务需要执行
                            "('pexpire', KEYS[1], ARGV[2]); " + //则重新设置该锁的有效时间
                            "return 0; " + //返回0结束
                        "else " +
                            "('del', KEYS[1]); " + //否则表示该线程执行结束,删除该锁
                            "('publish', KEYS[2], ARGV[1]); " + //并且发布该锁解除的消息
                            "return 1; "+ //返回1结束
                        "end; " +
                        "return nil;", //其他情况返回nil并结束
                        Arrays.<Object>asList(getName(), getChannelName()), , internalLockLeaseTime, getLockName(().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + ().getId());
        if (opStatus) {

KEYS[1] 表是的是getName() 代表锁名test_lock
KEYS[2] 表示getChanelName() 表示的是发布订阅过程中使用的Chanel
ARGV[1] 表示的是 是解锁消息,实际代表的是数字 0,代表解锁消息
ARGV[2] 表示的是internalLockLeaseTime 默认的有效时间 30s
ARGV[3] 表示的是getLockName(().getId()),是当前锁id+线程id


    public void forceUnlock() {

    public RFuture<Boolean> forceUnlockAsync() {
        return (getName(), , RedisCommands.EVAL_BOOLEAN,
                "if (('del', KEYS[1]) == 1) then "
                + "('publish', KEYS[2], ARGV[1]); "
                + "return 1 "
                + "else "
                + "return 0 "
                + "end",
                Arrays.<Object>asList(getName(), getChannelName()), );

