4.3.1 解决超卖问题
1)系统需求
在抢券模块中需要实现下边两个需求:
1、提升高并发吞吐量
抢券类似抢购、秒杀等业务场景具有时效性的特点,提前规定好用户在什么时间可以抢购,用户访问集中,这样就会给系统造成高并发,抢券模块在设计时要以提升系统在高并发下的吞吐量为目标,吞吐量表示单位时间内系统处理的总请求数量,吞吐量高意味着系统处理能力强。
衡量系统吞吐量的常用指标有哪些?
QPS(Queries Per Second):
每秒查询数(Queries Per Second),它表示系统在每秒内能够处理的查询或请求的数量,是衡量一个系统处理请求的性能和吞吐量的指标。
计算公式:总请求数/时间窗口大小
示例:
在10秒内处理1万个请求,QPS为1000。每个请求处理的时间越短,QPS越大。
假设:一个网站有10万用户,有2万日活跃用户,并发量是4000,每个用户每秒平均发起2个请求,那么总请求数就是 2*4000,那么QPS就是 8000,如果单机支持2000的qps理论上需要4台服务器。
qps指标是需要根据服务器硬件性能、及具体的业务场景去测试,比如:门户查询数据如果直接走Nginx静态服务器则QPS可以达到上万,如果请求查询Tomcat,并且通过数据库去查询数据库返回,此时QPS会远低于查询Nginx静态服务器的QPS值,如果不走数据库,而是从Redis查询数其QPS也会大大提升。
TPS(Transactions Per Second):
表示系统每秒完成的事务数,与QPS不同,TPS更关注系统的事务处理能力,而不仅仅是单纯的查询或请求,一次事务通常会包括多个请求。在高度事务性的系统中,如在线交易系统、支付系统等,TPS是一个关键指标,用于衡量系统的处理能力。
TPS指标通常会涉及业务处理及数据库存储,在测试时也需要根据服务器硬件性能、及具体的业务场景去测试,拿下单举例:单机支持几十到几百的TPS指标属于正常。
在开发中,对以上性能指标的优化,可通过CDN、缓存、异步处理、数据库优化、多线程、集群、负载均衡等技术去提高系统的吞吐量。当然,再优化也不要忘记系统保护,通过限流技术根据系统的性能指标进行限流保护。
2、解决超卖问题
抢购、秒杀等业务场景还需要解决超卖问题,超卖是最终下单购买数量大于库存数量,比如:库存有100个,用户最终购买了101个,多出这一个就是超卖了,结合抢券业务即用户最终抢到的优惠券总数大于优惠券库存数。
下边先分析超卖问题的解决方案。
2)什么是超卖问题
超卖是最终下单购买数量大于库存数量,比如:库存100个用户最终购买成功了101个,多出这一个就是超卖了,结合抢券业务,用户最终抢到的优惠券总数大于优惠券库存数就出现了超卖问题。
导致超卖问题的原因是什么呢?
下边举例说明超卖问题并分析导致超卖问题的原因。
下图是两个线程更新数据库的库存字段。
线程1:先查询库存为1,判断是否大于0,如果大于则库存减1,最后更新数据库库存字段。
线程2:先查询库存为1,判断是否大于0,如果大于则库存减1,最后更新数据库库存字段。
线程1和线程2查询到的库存都是1,两个线程分别减1得到剩余库存数0,由于线程2并不是基于线程1扣减库存后的值进行扣减,线程2更新库存覆盖了线程1更新的库存值。
上边的例子就出现了超卖的问题。
造成超卖问题的原因是在高并发场景下对库存这个共享资源进行操作存在线程不安全所导致。
3)悲观锁与乐观锁
提到解决线程安全问题大家想到了锁,下边复习下关于锁的基本概念。
jvm提供了很多锁,比如:synchronized、reentrantLock、CAS等,它们都可以解决线程安全问题,synchronized、reentrantLock可以实现悲观锁,CAS可以实现乐观锁,关于这些锁的 知识掌握不牢固的一定要自行复习。下边理解悲观锁与乐观锁的概念。
Object lock=new Object();
public void test(){
synchronized (lock){
System.out.println("test");
}
}
- 悲观锁
悲观锁是一种悲观思想,总认为会有其它线程修改数据,为了保证线程安全所以在操作前总是先加锁,操作完成后释放锁,其它线程只有当锁释放后才可以获取锁继续操作数据。synchronized和ReentrantLock都可以实现悲观锁。
使用悲观锁后原来的多线程并发执行改为了顺序(同步)执行,当线程2去执行时查询到库存发现为0,不满足条件更新库存失败。
- 乐观锁
乐观锁则是一种乐观思想,认为不会有太多线程去并发修改数据,所以谁都可以去执行代码。
Java提供的CAS机制可以实现乐观锁,CAS即Compare And Swap 比较并交换,在修改数据前先比较版本号,如果数据的版本号没有变化说明数据没有修改,此时再去更改数据。
示例如下:
库存数据对应一个版本,库存每次变化则版本号跟着变化,如下:
库存 | 版本号 |
---|---|
100 | 1 |
99 | 2 |
… | … |
1 | 100 |
0 | 101 |
线程1修改库存前拿到库存及对应的版本号:1和100。
线程1判断库存如果大于0则将库存减1,准备更新库存。
更新库存时要校验当前库存的版本是否和自己之前拿到的一致,如果版本号为1说明自己在执行的这过程没有其它线程去修改过库存,此时将库存更新为99并将版本号加1为2。
线程2执行和线程1一样的逻辑,线程2去更新库存时发现库存的版本号为2与自己之前拿到的不一致,更新库存失败。
- 结论
悲观锁和乐观锁都是一种解决共享资源的线程安全问题的方法,悲观锁是在读数据时就加锁,如果读比较多则加锁频繁影响性能,相比而言乐观锁性能比悲观锁要好。
4)数据库行锁控制方案
数据库的行级锁可以实现悲观锁也可以实现乐观锁。
- 实现悲观锁(排他锁)
执行select … for update 实现加锁,select … for update 会锁住符合条件的行的数据,如下语句会锁一行的数据
select * from 库存表 where id=? for update
通常此语句放在事务中,开启事务开始时执行此语句获取锁,事务提交或回滚自动锁释放,保证在事务处理过程中没有其它线程去修改数据。
测试:
使用一个线程执行下边的命令:
start transaction;
select ... for update;
COMMIT;
另一个线程修改数据,如果上边不释放锁将无法修改。
高并发场景不推荐使用select … for update方法,同时也可能存在死锁的潜在风险。
- 实现乐观锁
数据库的行级锁也可以实现乐观锁,通用的做法是在表中添加一个version版本字段,在更新时对比版本号,更新成功将版本号加1,SQL为:
update 表名 set 字段=值,version=version+1 where id =? and version =?
针对扣减库存业务扣减库存SQL:
update 库存表 set 库存=库存-1 where 库存>0 and id =?
多线程执行上边的SQL,假如线程1先执行会添加排他锁,当事务没有结束前其它线程去更新同一条记录会被阻塞,等到线程1更新结束其它线程才可以更新库存。
当执行update后返回影响的记录行数为1表示更新成功即扣减库存成功,返回0表示没有更新记录行,即扣减库存失败。
- 结论
悲观锁在查询时就开始加锁,如果读比较多则加锁频繁影响性能,相比而言乐观锁性能比悲观锁要好。
对于并发不高的场景可以使用数据乐观锁去控制扣减库存,由于抢购业务并发较高且对性能要求也高,如果使用数据库行锁去控制,并发高就会对数据造成压力,如果进行限流控制并发数又无法满足性能要求,所以对于抢购业务使用数据库行锁进行控制是不适合的。
5)Redis分布式锁方案
数据库乐观锁不适用高并发场景,我们能否将库存数据放在Redis,并且通过JVM锁去控制扣减库存呢?
上边介绍的synchronized、reentrantLock、CAS只控制了JVM本身的线程争抢同一个锁,无法控制多个JVM之间争抢同一个锁。
如下图,有两个JVM进程,每个JVM进程都有一个Lock01锁,这两个JVM进程中的线程1仍然会同时去修改库存:
线程1:先查询库存为1,判断是否大于0,如果大于则库存减1,最后更新Redis库存数据。
线程2:先查询库存为1,判断是否大于0,如果大于则库存减1,最后更新Redis库存数据。
此时就会出现修改库存数据的线程不安全问题。
所以,如果是单机环境下,使用JVM的锁在内存加锁可以解决资源并发访问的线程安全问题。
微服务架构的项目在部署时每个微服务会部署多个实例(JVM),每个实例就是一个JVM,如果要控制多个JVM之间争抢资源需要用到分布式锁,分布式锁是由一个统一的服务提供分布式锁服务,比如:使用redis、数据库都可以实现分布式锁,下边介绍分布式锁控制争抢资源的方法。
如下图:每个JVM中的线程去争抢同一个分布式锁,在扣减库存前先获取分布式锁,拿到锁再扣减库存,执行完释放锁之后其它JVM的线程才可以获取锁继续扣减库存,如下图:
上边的方案将库存放在Redis中避免与数据库交互,很大的提高的了执行效率,在分布式场景下使用分布式锁是一种常用的控制共享资源的方案。
分布式锁需要搭建独立的分布式锁服务(例如Redis、Zookeeper等),每次操作需要远程与分布式锁服务交互获取锁、释放锁,还有没有性能更高的方案呢?
6)Redis原子操作方案
上边使用分布式锁的方案每次操作需要远程与分布式锁服务交互获取锁、释放锁,有没有优化的方法避免申请锁与释放锁的交互呢?
在分布式锁方案中是在java程序扣减库存最后更新redis库存的值,能否使用redis的decr命令去扣减库存呢?
Redis Decr 命令将 key 中储存的数字值减一,并且具有原子性,Redis中所有命令都具有原子性。
原子性表示该命令在执行过程中是不被中断的,也就实现了多线程去执行decr命令扣减库存是顺序执行的,假如库存原来是100,扣减到0结束,多线程并发执行decr命令不会出现扣减次数超过100次,如下图:
基于这个思想可以对分布式锁方案优化如下:
此方案中没有使用分布式锁,而是基于Redis命令具有原子性的特点实现。
本项目使用Redis原子操作控制超卖问题。
4.3.2 Redis原子操作方案
在Redis原子操作方案中扣减库存使用decr命令实现,decr命令具有原子性,如果在扣减库存操作中有多个操作 ,那么整体还是原子性吗?如下图:
扣减库存逻辑如下:
1、首先查询库存
2、判断库存大小,如果大于0则扣减库存,否则 直接返回
3、记录抢券成功的记录,用于判断用户不能重复抢券的依据。
4、记录抢券同步的记录,用于后续的异步处理,将抢券结果保存到数据库。
如果上述四步整体不具有原子性仍然没有办法控制超卖问题,所以必须保证1、2、3步逻辑放在一起整体具有原子性。
如何保证多个Redis命令具有原子性呢?本节介绍两个保证Redis多个命令具有原子性的方法。
1)通过 MULTI 事务命令实现
对于redis单个命令都是原子操作,现在要求扣减库存、写入抢券成功队列及写入同步队列保证原子性,多个redis命令如何保证原子性呢?
1、通过 MULTI 事务命令实现
下边的命令执行流程如下:
执行MULTI 标记首先标记一个事务块开始。
然后将要执行的命令加入队列。
将“HSET key1 field1 value2 field2 value2” 命令放入队列中,表示向key1中写入两个hashkey。
将“INCR key2”命令放入队列中,表示对key2自增1。
运行EXEC命令按顺序执行,整体保证原子性。
MULTI
HSET key1 field1 value2 field2 value2
INCR key2
EXEC
测试:
2)了解Pipeline与MULTI 的区别
学习过Redis的同学听说过pipline管道命令,pipline也可实现批量执行多个 redis命令,pipline与multi的区别是:
pipeline 是把多个redis指令一起发出去,redis并没有保证这些命令的执行是原子的;multi实现的是将多个命令作为事务块去执行,保证整个操作的原子性。
如果仅是执行多个命令不保证原子性那么使用pipeline 的性能要比multi要高,但是针对本项目要保证多个命令实现原子性的需求那么pipeline 不符合要求。
3)Redis+Lua实现
Lua 是一种强大、高效、轻量级、可嵌入的脚本语言,Lua体积小、启动速度快,从而适合嵌入在别的程序里,Lua可以用于web开发、游戏开发、嵌入式开发等领域。
参考:http://www.lua.org/docs.html,或者去百度搜索Lua中文教程。
对于Lua脚本语法非常容易理解,先不用系统的去学习,先把本项目使用的Lua脚本读懂即可,实际工作中用到时再参考本项目的脚本去写即可,不会的再查Lua 的语法。
先看一个例子,对上边的例子编写Lua脚本,如下:
local ret = redis.call('hset', KEYS[1], ARGV[1], ARGV[2], ARGV[3], ARGV[4]);
redis.call('incr', KEYS[2]);
return ret..'';
说明:
KEYS:表示在脚本中所用到的那些 Redis 键(key),这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,KEYS[1]表示第一个key,KEYS[2]表示第2个key,依次类推。
ARGV:表示在脚本中所用到的参数,在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似( ARGV[1] 、 ARGV[2] ,诸如此类),ARGV[1]表示第一个参数,ARGV[2]表示第二个参数,依次类推。
如何执行上边的Lua脚本呢?
使用EVAL 命令执行Lua脚本。
EVAL是redis的命令本身具有原子性,整个脚本的执行具有原子性。
EVAL script numkeys key [key ...] arg [arg ...]
参数说明:
- script: 是一段 Lua 5.1 脚本程序。
- numkeys: 用于指定键名参数的个数。(操作大key的数量,小key不管)
- key [key …]: 从 EVAL 的第三个参数开始算起,表示在脚本中所用到的那些 Redis 键(key),这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,用 1 为基址的形式访问( KEYS[1] , KEYS[2] ,以此类推)。
- arg [arg …]: 附加参数,在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似( ARGV[1] 、 ARGV[2] ,诸如此类)。
执行下边的命令:
eval "local ret = redis.call('hset', KEYS[1], ARGV[1], ARGV[2], ARGV[3], ARGV[4]);redis.call('incr', KEYS[2]);return ret..'';" 2 test_key01 test_key02 field1 aa field2 bb
说明:
eval后边的script参数即脚本程序,将上边的Lua脚本使用双引号括起来。
numkeys:为2表示2个key
之后传入key的名称(多key中间用空格分隔):test_key01 test_key02
key后边再传入ARGV 参数(多ARGV 中间用空格分隔):field1 aa field2 bb
测试结果如下所示:
返回2表示向hash中写入2个key
下边是使用RedisTemplate执行Lua脚本的方法:
<T> T execute(RedisScript<T> script, List<K> keys, Object... args)
通过第一个参数类型指定要执行的Lua脚本,RedisScript的实现类是DefaultRedisScript,下边查阅DefaultRedisScript的源代码。
第一种方法是将Lua脚本的内容作为字符串传入DefaultRedisScript对象并且指定返回值类型,代码如下:
public DefaultRedisScript(String script, @Nullable Class<T> resultType) {
this.shaModifiedMonitor = new Object();
this.setScriptText(script);
this.resultType = resultType;
}
如果脚本内容比较多使用第一种方法显得很麻烦。
第二种方法是指定Lua脚本的位置,通过DefaultRedisScript的setScriptSource方法完成,如下:
public void setScriptSource(ScriptSource scriptSource) {
this.scriptSource = scriptSource;
}
本项目使用第二种方法,在RedisLuaConfiguration中定义DefaultRedisScript bean
@Bean("Lua_test01")
public DefaultRedisScript<Integer> getLuaTest01() {
DefaultRedisScript<Integer> redisScript = new DefaultRedisScript<>();
//resource目录下的scripts文件下的Lua_test01.Lua文件
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("scripts/Lua_test01.Lua")));
redisScript.setResultType(Integer.class);
return redisScript;
}
创建lua脚本
创建RedisTest测试类:
注入上边定义的DefaultRedisScript,注意注入时指定名称“Lua_test01”。
package com.jzo2o.market.service;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
/**
* @author Mr.M
* @version 1.0
* @description TODO
* @date 2023/10/13 16:28
*/
@SpringBootTest
@Slf4j
public class RedisLuaTest {
@Resource(name = "redisTemplate")
RedisTemplate redisTemplate;
@Resource(name = "Lua_test01")
DefaultRedisScript script;
//测试Lua
@Test
public void test_Luafirst() {
//参数1:key ,key1:test_key01 key2:test_key02
List<String> keys = Arrays.asList("test_key01","test_key02");
//参数2:传入Lua脚本的参数,"field1","aa","field2", "bb"
Object result = redisTemplate.execute(script, keys, "field1","aa","field2", "bb");
log.info("执行结果:{}",result);
}
}
执行test_Luafirst() 测试方法。
4)选择方案
上边学习了Multi和Redis+Lua两种实现Redis原子操作的方案,在本项目你会选择哪一种方案?
你肯定会说是第一种方案,因为简单,使用Lua脚本还要写Lua脚本,去学习它的语法。
不过,在实际使用中要根据具体的需求去确定方案,比如下边的Lua脚本在执行过程中就会有一些业务逻辑判断,不满足条件提前返回结果,而MULTI 执行命令是执行完成最后一起拿到所有命令的执行结果,并且MULTI 不适合写带有业务逻辑的脚本内容。
下边的lua脚本实现了项目抢券功能,可以尝试阅读,稍后会详细讲解。
-- 抢券Lua实现
-- key: 抢券同步队列,资源库存,抢券成功列表
-- argv:活动id,用户id
--优惠券是否已经抢过
local couponNum = redis.call("HGET", KEYS[3], ARGV[2])
-- hget 获取不到数据返回false而不是nil
if couponNum ~= false and tonumber(couponNum) >= 1
then
return "-1";
end
-- --库存是否充足校验
local stockNum = redis.call("HGET",KEYS[2], ARGV[1])
if stockNum == false or tonumber(stockNum) < 1
then
return "-2";
end
--抢券列表
local listNum = redis.call("HSET",KEYS[3], ARGV[2], 1)
if listNum == false or tonumber(listNum) < 1
then
return "-3";
end
--减库存
stockNum = redis.call("HINCRBY",KEYS[2], ARGV[1], -1)
if tonumber(stockNum) < 0
then
return "-4"
end
-- 抢单结果写入同步队列
local result = redis.call("HSETNX", KEYS[1], ARGV[2],ARGV[1])
if result > 0
then
return ARGV[1] ..""
end
return "-5"
根据需求本项目使用Redis执行Lua脚本的方式保证多命令的原子性,完成抢券功能。
在Spring的测试中,因为config配置中针对不同的脚本文件使用不同的bean,自然测试的时候需要引入对应的bean,并且我们现在是单元测试,自然lua_test01的脚本文件应该放在test下的resource的script文件夹中
@SpringBootTest
@Slf4j
public class RedisLuaTest {
@Resource(name = "redisTemplate")
RedisTemplate redisTemplate;
@Resource(name = "lua_test01")
DefaultRedisScript script;
//测试lua
@Test
public void test_luafirst() {
//参数1:key ,key1:test_key01 key2:test_key02
List<String> keys = Arrays.asList("bblb1","bblb2");
//参数2:传入lua脚本的参数,"field1","aa","field2", "bb"
Object result = redisTemplate.execute(script, keys, "test1","bb","test2", "lb");
log.info("执行结果:{}",result);
}
@Test
public void test_luafirst2() {
//参数1:key ,key1:test_key01
List<String> keys = Arrays.asList("test_key01{1}","test_key02{1}");
//参数2:传入lua脚本的参数,"field1","aa","field2", "bb"
Object result = redisTemplate.execute(script, keys, "field1","aa","field2", "bb");
log.info("执行结果:{}",result);
}
}
对应的lua_test01中的内容
local ret = redis.call('hset', KEYS[1], ARGV[1], ARGV[2], ARGV[3], ARGV[4]);
redis.call('incr', KEYS[2]);
return ret..'';
测试理论上,应该是两个大key,分别是bblb1和bblb2,然后bblb2里有自增的1,bblb2有俩键值对,分别是(test1,bb)和(test2,lb)
测试成功
5)使用Lua脚本注意点
Lua脚本在redis集群上执行需要注意什么?
在redis集群下执行redis命令会根据key求哈希,确定具体的槽位(slot),然后将命令路由到负责该槽位的 Redis 节点上。
执行一次Lua脚本会涉及到多个key,在redis集群下执行lua脚本要求多个key必须最终落到同一个节点,否则调用Lua脚本会报错:ERR eval/evalsha command keys must be in same slot。
如何保证多个key落地到一个redis节点呢?
只要保证多个key的哈希值一致即可保证多个key落到一个redis节点上,这个如何实现呢?
解决方法:一次执行Lua脚本的所有key中使用大括号‘{}’且保证大括号中的内容相同,此时会根据大括号中的内容求哈希,因为内容相同所以求得的哈希数据相同所以就落在了同一个Redis节点。
测试如下:
所以我们的代码要变成在key名称后边添加{},大括号中写一个固定的值。
@Test
public void test_luafirst2() {
//参数1:key ,key1:test_key01
List<String> keys = Arrays.asList("bblb1{1}","bblb{1}");
//参数2:传入lua脚本的参数,"field1","aa","field2", "bb"
Object result = redisTemplate.execute(script, keys, "test1","bb","test2", "lb");
log.info("执行结果:{}",result);
}
执行测试成功,观察redis多了两个key:"bblb1{1}“和"bblb{1}”
4.3.3 抢券整体方案
1)抢券方案分析
抢券的架构设计思想同抢券查询,将库存保存在Redis,避免抢券操作请求数据库,通过异步任务将Redis中的抢券结果同步到数据库。
抢券的交互流程如下:
如下图:
说明如下:
1、由预热程序将待生效库存同步到redis(活动开始将不允许更改库存)
2、活动开始后,抢券程序请求Redis扣减库存,扣减库存成功向抢券成功队列和抢券同步队列写入记录
Redis中两个队列的作用如下:
抢券成功队列:为了校验用户是否抢过该优惠券。
抢券同步队列:将抢券结果同步到数据库
3、通过定时任务程序根据Redis中同步队列记录的用户抢券结果信息将数据同步到MySQL,具体操作如下:
向优惠券表插入用户抢券记录。
更新优惠券活动表的库存。
写入数据库完成后删除Redis中同步队列的相应记录,删除后表示同步完成,如果同步过程失败将保留Redis同步队列的相应记录。
2)数据流
根据交互流程分析数据流如下:
3)Redis数据结构
- 活动信息
缓存结构:String类型:
key: “ACTIVITY:LIST”
value: 符合条件的优惠券活动列表JSON数据。
过期时间:永不过期
缓存一致性方案:通过预热程序保证缓存一致性
- 优惠券活动库存
缓存结构:Hash
RedisKey:COUPON:RESOURCE:STOCK:{活动id%10}
{活动id%10}表示根据活动id除以10求余,通过这种方法将key分散到不同的redis服务器上,通过“活动id%10”表达式可知优惠券活动库存hash最多有10个。
HashKey:活动id
HashValue: 库存
过期时间:永不过期
缓存一致性方案:通过预热程序保证缓存一致性
举例:
如果n为10,1号活动的库存是100,将1号活动库存存储Redis的效果如下:
- 抢券成功队列
缓存结构:Hash
RedisKey:COUPON:SEIZE:LIST:活动id_{活动id%10}
HashKey:用户id
HashValue:1
过期时间:永不过期
- 抢券同步队列
缓存结构:Hash
RedisKey:QUEUE:COUPON:SEIZE:SYNC:{活动id%10}
HashKey:用户id
HashValue:活动id
过期时间:永不过期
4)小结
抢券是怎么做的?或方案是什么?
抢券业务的Redis数据结构用的什么?具体说说
秒杀系统中如何进行流量削峰?
在秒杀系统中进行流量削峰是非常重要的,因为瞬时的高流量可能会导致系统崩溃或性能下降。以下是一些常见的流量削峰策略:
- **限流措施:**通过控制请求的发放速率,可以有效地平滑流量,避免瞬时的高并发。
- 队列缓冲: 使用消息队列来缓冲请求,将瞬时的高并发请求进行缓存和排队。秒杀系统可以异步地从队列中取出请求进行处理,以平滑处理流量。
- **分批处理:**将瞬时的高并发请求分批处理。不需要一次性处理所有请求,可以将请求按照一定的规模分批处理,以减轻数据库和系统的压力。
- 负载均衡:采用多节点部署,通过负载均衡器将流量分发到不同的服务器上。
-
熔断机制:
- 熔断策略: 实现熔断机制,当系统达到一定的负载阈值时,暂时停止接受新的请求,防止系统崩溃。等到系统恢复后再重新开启。
- **缓存预热:**在秒杀开始之前,提前将秒杀商品的信息加载到缓存中,减轻数据库的压力。
- **验证码和身份验证:**引入验证码和身份验证机制,防止机器人或恶意请求,减少无效请求对系统的冲击。
- **数据库优化:**对于秒杀系统,数据库通常是瓶颈之一。通过优化数据库结构、建立索引、使用缓存等手段来提高数据库的读写性能。
4.3.4 库存同步
1)系统设计
根据整体方案分析,用户抢券要在Redis扣减库存,所以需要提前将优惠券活动的库存同步到Redis。
可以通过定时预热程序中将优惠券活动的库存同步到Redis,同步规则如下:
- 对于待生效的活动更新库存。
- 对于已生效的活动如果库存已经同步则不再同步,只更新没有同步库存的活动。
做第二点的原因是为了避免时间差问题,活动状态更改为进行中了但是库存还没有同步到Redis。
当用户抢券成功,Redis中的库存有了变化,如何将最新库存由Redis同步MySQL呢?
根据整体方案分析,在抢券结果同步程序中根据抢券结果修改数据库中的库存,此部分在抢券结果同步章节再确定具体的方法。
交互流程如下:
2)预热程序中同步库存
下边实现在预热程序中同步库存。
在com.jzo2o.market.service.impl.ActivityServiceImpl#preHeat的预热程序中添加:
@Override
public void preHeat() {
/**
*SELECT *FROM activity tWHERE t.distribute_start_time <= DATE_ADD(NOW(), INTERVAL 30 DAY) AND t.status IN (1, 2)ORDER BY t.distribute_start_time ASC;
*/
// 1.查询准备
LocalDateTime now = DateUtils.now();
LocalDateTime preHeatTime = now.plusDays(30);
LambdaQueryWrapper<Activity> lambdaQueryWrapper = new LambdaQueryWrapper<>();
// 查询条件
lambdaQueryWrapper.le(Activity::getDistributeStartTime, preHeatTime)
.in(Activity::getStatus, Arrays.asList(NO_DISTRIBUTE.getStatus(), DISTRIBUTING.getStatus()))
.orderByAsc(Activity::getDistributeStartTime);
// 查询
List<Activity> activities = baseMapper.selectList(lambdaQueryWrapper);
if (CollUtils.isEmpty(activities)) {
//防止缓存穿透
activities = new ArrayList<>();
}
// 2.数据转换: 将List<Activity> 转为List<SeizeCouponInfoResDTO>
List<SeizeCouponInfoResDTO> seizeCouponInfoResDTOS = BeanUtils.copyToList(activities, SeizeCouponInfoResDTO.class);
// 3.再转为json字符串
String json = JsonUtils.toJsonStr(seizeCouponInfoResDTOS);
// 4.存入redis,操作string用opsForValue(),操作哈希用opsForHash()
redisTemplate.opsForValue().set(ACTIVITY_CACHE_LIST, json);
// 5.对未开始的活动的库存直接更新到redis
activities.stream().filter(v->getStatus(v.getDistributeStartTime(),v.getDistributeEndTime(),v.getStatus())==NO_DISTRIBUTE.getStatus()).forEach(v->{
redisTemplate.opsForHash().put(String.format(COUPON_RESOURCE_STOCK, v.getId() % 10), v.getId(), v.getTotalNum());
});
// 6.对已经开始的活动的库存,如果redis中没有,则更新到redis
activities.stream().filter(v->getStatus(v.getDistributeStartTime(),v.getDistributeEndTime(),v.getStatus())==DISTRIBUTING.getStatus()).forEach(v->{
redisTemplate.opsForHash().putIfAbsent(String.format(COUPON_RESOURCE_STOCK, v.getId() % 10), v.getId(), v.getTotalNum());
});
说明:
对于待生效的活动库存使用put方法,可以对已设置的记录进行更改。
对已生效的活动库存使用putIfAbsent实现,当key不存在时才执行设置操作。
String.format(COUPON_RESOURCE_STOCK, v.getId() % 10) 用来拼装 key,库存的redis key为:
COUPON:RESOURCE:STOCK:{活动id%10}。
3)测试
测试流程:
启动定时预热程序任务。
观察redis中是否成功存储库存信息。
先清空redis后,直接执行一次定时任务
我们当前只有洗澡大派送是进行中,自然只有一个。
4.3.5 抢券Lua脚本
1)抢券Lua脚本
本节对抢券Lua脚本进行阅读并测试,理解抢券的过程。
- 阅读下边的Lua脚本
-- 抢券lua实现
-- key: 抢券同步队列,资源库存,抢券成功列表
-- argv:活动id,用户id
--优惠券是否已经抢过
local couponNum = redis.call("HGET", KEYS[3], ARGV[2])
-- hget 获取不到数据返回false而不是nil
if couponNum ~= false and tonumber(couponNum) >= 1
then
return "-1";
end
-- --库存是否充足校验
local stockNum = redis.call("HGET",KEYS[2], ARGV[1])
if stockNum == false or tonumber(stockNum) < 1
then
return "-2";
end
--抢券列表
local listNum = redis.call("HSET",KEYS[3], ARGV[2], 1)
if listNum == false or tonumber(listNum) < 1
then
return "-3";
end
--减库存
stockNum = redis.call("HINCRBY",KEYS[2], ARGV[1], -1)
if tonumber(stockNum) < 0
then
return "-4"
end
-- 抢券结果写入同步队列
local result = redis.call("HSETNX", KEYS[1], ARGV[2],ARGV[1])
if result > 0
then
return ARGV[1] ..""
end
return "-5"
错误代码:
-1: 限领一张
-2: 已抢光
-3: 写入抢券成功队列失败,返回给用户为:抢券失败
-4: 已抢光
-5: 写入抢券同步队列失败,返回给用户为:抢券失败
2)测试
编写测试方法,准备好调用抢券Lua脚本需要传入的key和参数。
代码如下:
@Test
void test_seizeCouponScriptLua() {
//argv:抢券活动id
long activityId = 1851921214852177920L;
// argv: 用户id
Long userId = 1828787045661319168L;
//index:就是rediskey后大括号里的内容,多个rediskey的内容必须一直,否则会出现数据不一致,就是活动id%10
//int index = (int) (activityId % 10);
int index =0;
//key: 抢券同步队列,资源库存,抢券成功列表
// 同步队列redisKey
String couponSeizeSyncRedisKey = RedisSyncQueueUtils.getQueueRedisKey(COUPON_SEIZE_SYNC_QUEUE_NAME, index);
// 资源库存redisKey
String resourceStockRedisKey = String.format(COUPON_RESOURCE_STOCK, index);
// 抢券成功列表
String couponSeizeListRedisKey = String.format(COUPON_SEIZE_LIST,activityId, index);
// 抢券
Object execute = redisTemplate.execute(seizeCouponScript, Arrays.asList(couponSeizeSyncRedisKey, resourceStockRedisKey, couponSeizeListRedisKey),
activityId, userId);
log.debug("seize coupon result : {}", execute);
}
执行成功,观察redis:
示例:
库存是否减少:
抢券成功队列是否存在相应记录:
抢券同步队列是否存在相应记录:
3) 小结
抢券的Lua脚本做的什么工作?
- 判断用户是否在该活动抢过券。
- 判断库存是否充足
- 写入抢券成功列表
- 扣减库存
- 写入抢券同步列表
4.3.6 抢券接口开发
1)接口定义
下边进行接口分析,定义抢券接口。
在下边的界面中点击“立即领取”即开始抢券。
请求哪些参数?
抢券需要明确两个元素: 哪个用户抢的是哪个活动的优惠券。
用户的身份信息在token中由前端传入服务端。
所以,本接口需要传入服务端的参数是活动ID。
传入参数:活动ID。
响应结果:无,通过状态码判断。
接口定义如下:
接口名称:抢券接口
接口路径:POST/market/consumer/coupon/seize
编写controller方法:
@RestController("consumerCouponController")
@RequestMapping("/consumer/coupon")
@Api(tags = "用户端-优惠券相关接口")
public class CouponController {
@PostMapping("/seize")
public void seizeCoupon(@RequestBody SeizeCouponReqDTO seizeCouponReqDTO) {
}
...
2)校验活动有效性
下边定义service方法,在service方法中需要做哪些事?
- 校验活动是否有效
- 调用Lua脚本执行抢券
本节实现第一步,校验活动的有效性。
定义 service接口如下:
public interface ICouponService extends IService<Coupon> {
/**
* 抢券
*
* @param seizeCouponReqDTO
*/
void seizeCoupon(SeizeCouponReqDTO seizeCouponReqDTO);
...
实现类:
@Override
public void seizeCoupon(SeizeCouponReqDTO seizeCouponReqDTO) {
// 1.校验活动开始时间或结束
// 首先从缓存查询活动
// 2.抢券准备
// key: 抢券同步队列,资源库存,抢券列表
// argv:抢券id,用户id
// 3.执行lua脚本进行抢券结果
// 4.处理lua脚本结果,失败的抛出异常,成功的正常返回
}
如何校验活动是否有效?
1、从缓存中查询指定活动的信息
抢券接口避免与数据库交互。
2、根据活动时间校验活动是否未开始或者已经结束,这两类活动不允许抢券
实现方法如下:
- 定义从缓存查询指定活动信息的方法
public interface IActivityService extends IService<Activity> {
/**
* 从缓存中获取活动信息
* @param id
* @return
*/
ActivityInfoResDTO getActivityInfoByIdFromCache(Long id);
实现方法如下:
@Override
public ActivityInfoResDTO getActivityInfoByIdFromCache(Long id) {
// 1.从缓存中获取活动信息
Object activityList = redisTemplate.opsForValue().get(ACTIVITY_CACHE_LIST);
if (ObjectUtils.isNull(activityList)) {
return null;
}
// 2.过滤指定活动信息
List<ActivityInfoResDTO> list = JsonUtils.toList(activityList.toString(), ActivityInfoResDTO.class);
if (CollUtils.isEmpty(list)) {
return null;
}
// 3.过滤指定活动
return list.stream()
.filter(activityInfoResDTO -> activityInfoResDTO.getId().equals(id))
.findFirst().orElse(null);
}
3)抢券service方法
下边编写抢券的service方法
@Override
public void seizeCoupon(SeizeCouponReqDTO seizeCouponReqDTO) {
// 1.校验活动开始时间或结束
// 抢券时间
ActivityInfoResDTO activity = activityService.getActivityInfoByIdFromCache(seizeCouponReqDTO.getId());
LocalDateTime now = DateUtils.now();
if (activity == null ||
activity.getDistributeStartTime().isAfter(now)) {
throw new CommonException(SEIZE_COUPON_FAILD, "活动未开始");
}
if (activity.getDistributeEndTime().isBefore(now)) {
throw new CommonException(SEIZE_COUPON_FAILD, "活动已结束");
}
// 2.抢券准备
// key: 抢券同步队列,资源库存,抢券列表
// argv:抢券id,用户id
int index = (int) (seizeCouponReqDTO.getId() % 10);
// 同步队列redisKey
String couponSeizeSyncRedisKey = RedisSyncQueueUtils.getQueueRedisKey(COUPON_SEIZE_SYNC_QUEUE_NAME, index);
// 资源库存redisKey
String resourceStockRedisKey = String.format(COUPON_RESOURCE_STOCK, index);
// 抢券列表
String couponSeizeListRedisKey = String.format(COUPON_SEIZE_LIST, activity.getId(), index);
log.debug("seize coupon keys -> couponSeizeListRedisKey->{},resourceStockRedisKey->{},couponSeizeListRedisKey->{},seizeCouponReqDTO.getId()->{},UserContext.currentUserId():{}",
couponSeizeListRedisKey, resourceStockRedisKey, couponSeizeListRedisKey, seizeCouponReqDTO.getId(), UserContext.currentUserId());
// 3.抢券结果
Object execute = redisTemplate.execute(seizeCouponScript, Arrays.asList(couponSeizeSyncRedisKey, resourceStockRedisKey, couponSeizeListRedisKey),
seizeCouponReqDTO.getId(), UserContext.currentUserId());
log.debug("seize coupon result : {}", execute);
// 4.处理lua脚本结果
if (execute == null) {
throw new CommonException(SEIZE_COUPON_FAILD, "抢券失败");
}
long result = NumberUtils.parseLong(execute.toString());
if (result > 0) {
return;
}
if (result == -1) {
throw new CommonException(SEIZE_COUPON_FAILD, "限领一张");
}
if (result == -2 || result == -4) {
throw new CommonException(SEIZE_COUPON_FAILD, "已抢光!");
}
throw new CommonException(SEIZE_COUPON_FAILD, "抢券失败");
}
4)抢券controller方法
@ApiOperation("抢券接口")
@PostMapping("/seize")
public void seizeCoupon(@RequestBody SeizeCouponReqDTO seizeCouponReqDTO) {
couponService.seizeCoupon(seizeCouponReqDTO);
}
5)抢券测试
启动网关
启动优惠券活动管理工程
启动xxl-job
打开小程序,进入抢券页面
点击领取,观察写入redis的数据是否正确:
示例:
抢券成功存入redis抢券成功队列:
存入redis抢券同步队列:
抢券失败情况测试:
限领一张提示:
已抢光就不测试了
6)小结
项目是怎么实现抢券功能的?
1)将优惠券活动的库存同步到Redis
2)用户抢券请求redis,执行Lua脚本,具体如下:
先判断当前用户是否抢过该优惠券,如果抢过则返回-1
判断库存是否充足,如果不充足返回-2
向抢券成功列表写入记录
扣减库存
向抢券同步列表写入记录
3)由异步任务将redis抢券成功记录同步到数据库中