代码说明:
watchDog机制主要是用来对redis中商品key进行锁续命,防止业务处理时间过长导致的误删key值。
lua脚本则用来对redis中指令的原子性操作,注意 lua脚本中不能有复杂逻辑,防止阻塞redis
/**
* 创建定时任务线程工厂
*/
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("watchDog-").get();
/**
* 创建定时任务线程
*/
private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(10,THREAD_FACTORY);
/**
* 存放看门狗返回的线程对象
*/
private static final ConcurrentHashMap<String,ScheduledFuture> CONCURRENT_HASH_MAP = new ConcurrentHashMap<>(16);
@GetMapping("/normal")
public String normalRedisLock() throws InterruptedException {
//给每一个线程都设置对应的UUID
String productId = "product_huawei_p30";
String stock = "stock";
String clientId = ().toString();
try {
//如果线程已经被加锁,死循环等待释放锁
while (true){
Integer stockNum = (().get(stock));
if(stockNum <= 0){
return "商品已经卖完";
}
//线程加锁,为10秒钟,设置为对应的客户端ID
Boolean setIfAbsent = ().setIfAbsent(productId, clientId, 10, );
if((setIfAbsent) && setIfAbsent){
break;
}
}
("----------------------------------开始扣减库存----------------------------------");
/**
* 看门狗机制,目的是在线程业务处理时间过长时,导致锁被提前释放,导致处理完成时错误的释放掉另外线程的锁
*/
WatchDogThread watchDogThread = new WatchDogThread(productId,clientId,stringRedisTemplate,CONCURRENT_HASH_MAP,SCHEDULED_EXECUTOR_SERVICE);
ScheduledFuture<?> scheduledFuture = SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(watchDogThread, 1, 5, );
/**
* 采用ConcurrentHaspMap用来存储,watchDog任务,并且停止指定的watchDog任务
*/
CONCURRENT_HASH_MAP.put(clientId,scheduledFuture);
//执行业务逻辑
int stockNum = (().get(stock));
if(stockNum > 0){
/*("模拟业务处理时间过长,看门狗续命机制.....");
(20000);*/
().set(stock,(stockNum-1));
("扣减库存成功库存数量为:"+().get(stock));
}else {
("库存扣减失败。。。。");
}
} catch (Exception e) {
/**
* 抛出异常时,获取到对应客户端ID的看门狗线程,并且停止看门狗机制
*/
ScheduledFuture scheduledFuture = CONCURRENT_HASH_MAP.get(clientId);
if(scheduledFuture != null){
("异常信息,移除看门狗线程。。。");
(true);
CONCURRENT_HASH_MAP.remove(clientId);
}
} finally {
//释放锁
(productId);
("----------------------------------业务执行完成----------------------------------");
}
return "";
}
WatchDog实现机制
public class WatchDogThread implements Runnable {
private String productId;
private String clientId;
private StringRedisTemplate stringRedisTemplate;
private ConcurrentHashMap<String, ScheduledFuture> cacheMap;
//获取到线程池的引用
private ScheduledExecutorService scheduledExecutorService;
/**
* lua脚本,目的原子操作,获取到商品锁如果等于当前客户端ID,执行锁续命
*/
private static final String SCRIPT = "if ('get',KEYS[1]) == ARGV[1] then" +
" local ttl = tonumber(('ttl',KEYS[1]));" +
" ('expire',KEYS[1],ttl+ARGV[2]) return ('ttl',KEYS[1]) end";
public WatchDogThread(String productId,String clientId, StringRedisTemplate stringRedisTemplate, ConcurrentHashMap<String, ScheduledFuture> concurrentHashMap,ScheduledExecutorService scheduledExecutorService) {
= clientId;
= productId;
= stringRedisTemplate;
= concurrentHashMap;
= scheduledExecutorService;
}
@Override
public void run() {
String lock = ().get(productId);
try {
//如果获取到锁为空,或者获取到的锁不等于当前客户端ID,那么就直接停止看门狗
if ((lock) || !(lock)) {
ScheduledFuture scheduledFuture = (clientId);
if (scheduledFuture != null) {
("库存扣减完成,关闭开门狗。。。");
(true);
(clientId);
}
return;
}
("执行续命任务ID:"+lock);
//执行lua脚本,用来原子性执行锁续命
(new DefaultRedisScript(SCRIPT, ), (productId),clientId,"10");
Long expire = (productId, );
("续命后时间;"+expire);
} catch (Exception e) {
("watchdog执行失败"+());
/**
* 如果watchDog执行续命任务出现异常,直接设置30秒过期时间,防止key值失效,导致误删
*/
(productId,30,);
/*WatchDogThread watchDogThread = new WatchDogThread(productId,clientId,stringRedisTemplate,,);
(watchDogThread, 1, 5, );*/
}
}
}
心得:
如果并发量过高,可以将商品库存进行拆分,如果是redis-cluster架构,可以通过一致性hash,将商品库存分配到不同的redis中进行存储,用来提高并发量。