Redis的使用(五)常见使用场景-分布式锁实现原理

时间:2024-07-20 07:49:48

1.绪论

为了解决并发问题,我们可以通过加锁的方式来保证数据的一致性,比如java中的synchronize关键字或者ReentrantLock,但是他们只能在同一jvm进程上加锁。现在的项目基本上都是分布式系统,如何对多个java实例进行加锁,这就需要用到分布式锁。分布式锁可以由多种实现方式,本文将要介绍的就是采用redis实现的方式。

2.分布式锁简单实现

2.1 基本原理

redis的分布式锁实现其实是由redis中的setnx命令来实现的,当线程1需要获取锁时,只需要调用setnx lockkey val,如果设置成功,便表示加锁成功。线程2到达时,同样调用setnx lockkey val方法,但是发现redis中已经存在lockkey时,便加锁失败。线程1执行完业务逻辑过后,便可以调用del key删除对应的key,这样便完成解锁。

setnx key val 表示当redis中不存在这个key便设置成功,否者设置失败。

3.如果set key成功,但是del key失败怎么办

3.1 问题

如果线程1在setnx key val调用成功过后,并且执行完业务逻辑,宕机了,此时相当于key便一直存在redis中,这样其他线程便一直不能获取到锁,导致死锁。

3.2 解决方案

我们可以在给key设置一个过期时间,这样利用redis过期策略进行兜底,就算线程1set key成功后宕机了,因为有过期时间,也能保证在一段时间后,这个key会被删除掉。

setnx key val
expire key timeout

可以看出可以使用上面两个命令,来对key设置过期时间,但是还是有问题,就是set key和expire key之间不是一个原子操作,这样会导致在加锁的时候会有并发问题。所以,我们相当redis中的lua脚本,来保证set key和expire key之间是一个原子操作。对于string字符串,redis提供了一个符合命令,来实现上面两个命令的功能:

set key val NX PX timeout

4 如何解决锁误删问题并实现可重入锁功能

4.1 问题描述

4.1.1 锁误删问题

1.线程1获取到锁,过后,由于设置了可以的超时时间,锁过期。

2.线程2来获取锁便能获取成功。

3.线程1执行完成,释放锁,调用del key,删除key成功。

4.线程3加锁,set key成功,但是此时线程2还在执行。

其实上面本质上就是线程释放了不是自己加的锁。

4.1.2 可重入锁

在java中的reentrantLock和syncronized都有可重入功能,即线程在获取到锁过后,能够再次获取当前锁,并且可冲入次数加1,如果释放锁时,可重入次数减1。

4.2 解决方案

既然上面线程释放了不是自己加的锁导致锁误删问题,我们可以在加锁是将线程id,记录到key中,这样,每次释放锁的时候,判断一下是否是本线程程加的锁,如果不是便直接返回,如果是便释放锁,就可以了。

而对于可重入问题,我们在记录线程id的时候,我们可以记录一下重入次数,每次重入的时候,重入加1,释放锁的时候,重入次数减1,减为0便删除key就可以了。

针对上面两点,我们可以采用hash结构来存储key,其中field为线程id,value为重入次数。

所以加锁变为:

//获取重入次数
local time  = hget key threadId
//如果未加锁,设置重入次数为1
if(nil == time) then
    hset key threadId 1
//如果已经加锁,设置重入次数自增
else 
    hincr key threadId
end
expire key 过期时间

释放锁为

//获取可重入次数
local time = hget key threadId
//如果没有threadId这个feild,表示该key已经过期,不用删除
if(nil = time) then 
    return 0
end
//表示当前只加锁了一次,删除锁
if (time < 2) then
    del key
else
//否者重入次数减1
    hincr key threadId -1
end


4.3 代码实现

4.3.1 java代码

public class SimpleRedisLock implements ILock {

    private String name;
    private StringRedisTemplate stringRedisTemplate;

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    private static final String KEY_PREFIX = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    private static final DefaultRedisScript<Long> LOCK_SCRIPT;

    static {
        LOCK_SCRIPT = new DefaultRedisScript<>();
        LOCK_SCRIPT.setLocation(new ClassPathResource("lock.lua"));
        LOCK_SCRIPT.setResultType(Long.class);

        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 调用lua脚本
        Long result = stringRedisTemplate.execute(LOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX + Thread.currentThread().getId(),
                timeoutSec);
        if (result == null) {
            return false;
        }
        return 1 == result;
    }

    @Override
    public void unlock() {
        // 调用lua脚本
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX + Thread.currentThread().getId());
    }
}
注意:java和lua的交互execute方法
/**
 * @param script lua脚本
 * @param keys 需要操作的key,在lua脚本中用KEYS数组来接收,从1开始
 * @param args 其他参数,在lua脚本中用ARGVS数组来接收,从1开始
 * @param <T>
 * @return
 */
public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {
    return this.scriptExecutor.execute(script, keys, args);
}

4.3.2 加锁lua脚本

--1.获取参数列表
--加锁key在keys数组中
local lockKey = KEYS[1]
--线程id
local threadId = ARGV[1]
--超时时间
local timeout = ARGV[2]

--2.获取重入次数
local time = redis.call('hget', lockKey, threadId)

--3.如果未加锁,设置重入次数为1
if (nil == time) then
    redis.call('hset', lockKey, threadId, 1)
    --如果已经加锁,设置重入次数自增
else
    redis.call('hincr', lockKey, threadId)
end
--4.设置过期时间
redis.call('expire', lockKey, timeout)
return 1

4.3.3 释放锁lua脚本

--1.获取参数列表
--加锁key在keys数组中
local lockKey = KEYS[1]
--线程id
local threadId = ARGV[1]
--2.获取重入次数
local time = redis.call('hget', lockKey, threadId)
--3.如果没有threadId这个feild,表示该key已经过期,不用删除
if(nil == time) then
    return 1
end
--4.如果重入次数为1,表示当前只加锁了一次,删除锁
if (time < 2) then
    redis.call('del', lockKey)
else
--5.否者重入次数减1
    redis.call('hincr', lockKey,threadId)
end
return 1

lua脚本调用redis命令

redis.call('命令名称', 'key', '其它参数', ...)

5.未获取锁时进行重试实现

5.1 问题描述

前面实现的分布式锁时,当线程1获取锁时,线程2尝试获取锁失败,便会直接返回失败。我们如果要线程2在获取锁失败后,在一段时间内尝试获取锁,如果超所该时间,才返回失败,应该如何实现呢?

5.2 问题解决

在线程获取锁失败时,我们可以进行自旋直到获取锁成功为止,但是这样会消耗资源。所以我们可以通过redis的发布订阅机制,当线程获取锁失败过后,订阅加锁的key,然后阻塞。当其他线程释放锁的时候,会给我们发送一个通知,唤醒当前线程。

//订阅某个频道

SUBSCRIBE channel [channel ...]

5.3 redisson源码分析

 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        //获取线程等待时间
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        //尝试获取锁,如果获取失败,返回当前key还有多久过期
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        //等待时间扣减从当前方法进入到执行到这里花费时间
        time -= System.currentTimeMillis() - current;
        //如果小于0表示已经过期
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        //定于当前key
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
            //真正的等待逻辑
            while (true) {
                long currentTime = System.currentTimeMillis();
                //再次获取锁,如果失败得到锁还有多久过期
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    //如果key被删除,会pulish消息,唤醒当前线程
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

6.如何解决锁续期问题

6.1 问题描述

我们在实现分布式锁的时候,会设置过期时间,超过这个过期时间,这个key便会被自动删除。假设超过这个过期时间,当前业务逻辑还未执行完成,这样其他线程能拿到锁,会导致并发问题。

6.2 解决方案

我们可以设置一个线程,专门用来监听当前业务逻辑是否完成,如果未完成,便对key的时间进行续期,在redission实现的分布式锁中,这个线程被称作watchDog。我们来看看redisson中是如何实现的。

6.3 redisson源码分析

  private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//如果设置锁自动释放时间不等于-1,走正常逻辑
        if (leaseTime != -1) {
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
//如果锁自动释放时间等于-1,开启watchDog,并且设置锁自动释放时间为30s
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
                //启动watchDog线程,为当前线程进行锁续期
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

加锁的流程:可以看出redission实现的分布式锁的lua逻辑其实和我们上面是差不多的。

  <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

锁续期流程:

    private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
         //其实就是将当前的线程id封装成一个entry,并且加入到一个Map中,然后调用renewExpiration方法
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            renewExpiration();
        }
    }
  private void renewExpiration() {
        //从当前map中获取监听的entry
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        //启动一个定时任务
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                //从map中取出需要续约的线程id
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                //将线程id的过期时间重置为30s
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        //递归调用,一直监听存活的线程
                        renewExpiration();
                    }
                });
            }
        //时间为10会触发一次定时任务
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

简单而言,其实就是启动一个定时线程,一直扫描存活的key,如果未过期,便重置其过期时间为30s。可以看出,当线程调用了unlock方法,便会停止锁续期。

6.4watchDog的几个问题

6.4.1 客户端宕机,watchDog是否会一直续期

答案是不会的,如果客户端宕机,证明当前jvm实例已经挂掉,所以执行watchDog的线程自然也挂掉了。

6.4.2 unLock失败,watchDog是否会一直续期

答案是不会的,在redisson的解锁方法中,会用到一个CompletionStage,它能保证无论删除key是否抛出异常,都能将当前线程id的锁续期任务从EXPIRATION_RENEWAL_MAP中移除。

7.redis主从一致性

7.1 问题描述

如果redis采用单机部署的话,redis宕机,导致整个服务都不可用。如果redis采用集群部署,但是会有主动不同步的问题,比如线程1将key加入到主节点,但是主节点还未将数据同步到从节点宕机,选举从节点为新的主节点,这个主节点并没有线程1设置的key,导致线程安全问题。\

7.2 联锁

7.2.1 实现原理

其实就是部署多个redis单击实例,当加锁的时候,向每个redis实例都发送setnx key val请求,当所有的redis实例都返回成功,才认为成功。

7.3 红锁

联锁是要求所有的锁加锁成功,才表示加锁成功,而红锁是只需要满足大于n/2+1个节点加锁成功,便成功。