秒杀业务
业务需求1:给库存的判断添加乐观锁
业务需求2:给一人只能下一单的判断,使用分布式锁的方式添加悲观锁
秒杀业务请求路径:/voucher-order/seckill/{id}
乐观锁与悲观锁
悲观锁:认为线程安全一定会发送,因此在操作数据之前先获取锁,确保线程串行执行
乐观锁:不加锁,只在更新数据时判断有没有其他线程对数据做了修改,若发现数据被修改则说明发生了安全问题
对于乐观锁,我们可以使用版本号来判断数据是否被修改,维护一个版本号字段(版本号法);也可以直接使用已有的字段来判断数据是否被修改(CAS法)
库存判断加乐观锁
对于库存的判断,这里使用 CAS 法给秒杀优惠卷的业务添加乐观锁,在更新库存的时候,再次查询库存是否大于 0 即可。
在判断还有库存后,从数据库中将库存 -1,这里我选择添加 Mapper 方法来操作数据库,and stock > 0 是加乐观锁的操作
@Mapper
public interface VoucherSeckillMapper extends BaseMapper<SeckillVoucher> {
@Update("update tb_seckill_voucher set stock = stock-1 where voucher_id = #{voucherId} and stock > 0")
boolean updateSecStock(@Param("voucherId") Long voucherId);
}
测试该方法:
@Autowired
private VoucherSeckillMapper voucherSeckillMapper;
//扣除秒杀优惠卷库存
@Test
void delStock(){
voucherSeckillMapper.updateSecStock(10L);
}
查询数据库库存,成功 -1
完成秒杀请求
创建一个订单表的封装类:
/**
* 订单表封装bean
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_voucher_order")
public class VoucherOrder implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.INPUT)
private Long id;
/**
* 下单的用户id
*/
private Long userId;
/**
* 购买的代金券id
*/
private Long voucherId;
/**
* 支付方式 1:余额支付;2:支付宝;3:微信
*/
private Integer payType;
/**
* 订单状态,1:未支付;2:已支付;3:已核销;4:已取消;5:退款中;6:已退款
*/
private Integer status;
/**
* 下单时间
*/
private LocalDateTime createTime;
/**
* 支付时间
*/
private LocalDateTime payTime;
/**
* 核销时间
*/
private LocalDateTime useTime;
/**
* 退款时间
*/
private LocalDateTime refundTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}
controller:
@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {
@Resource
private VoucherOrderService voucherOrderService;
@PostMapping("/seckill/{id}")
public Result orderVoucher(@PathVariable("id") Long voucherId){
return voucherOrderService.seckillVoucher(voucherId);
}
}
Service:
public interface VoucherOrderService {
Result seckillVoucher(Long voucherId);
}
@Service
public class VoucherOrderServiceImpl implements VoucherOrderService {
@Resource
private VoucherSeckillMapper voucherSeckillMapper;
//id 生成器
@Resource
private CreateOnlyId createOnlyId;
@Resource
private VoucherOrderMapper voucherOrderMapper;
@Override
public Result seckillVoucher(Long voucherId) {
//查询秒杀优惠卷
SeckillVoucher seckillVoucher = voucherSeckillMapper.selectById(voucherId);
//判断是否在活动时间内
if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("活动尚未开始");
}
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("活动已经结束");
}
//查看库存
if(seckillVoucher.getStock() < 1){
return Result.fail("库存不足");
}
//扣减库存
boolean flag = voucherSeckillMapper.updateSecStock(voucherId);
if(!flag){
return Result.fail("库存不足");
}
//创建订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderKey = createOnlyId.createID("order");
voucherOrder.setId(orderKey);
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(UserHolder.getUser().getId());
//将订单存入数据库
voucherOrderMapper.insert(voucherOrder);
return Result.ok(orderKey);
}
}
Mapper:
@Mapper
public interface VoucherOrderMapper extends BaseMapper<VoucherOrder> {
}
测试:
注意在发送测试请求时,需要在请求头带上 token
为了避免重复登录,我们手动添加一个永久登录用户到缓存重
//缓存一个永久用的用户登录信息
@Test
void saveUserForever(){
Map<String,String> userMap = new HashMap<>();
userMap.put("id","1");
userMap.put("nickName","管理员");
stringRedisTemplate.opsForHash().putAll("login:token:"+"textUser",userMap);
}
发送抢秒杀卷请求:
成功抢到卷,库存 -1 ,(自己测了几次,目前还剩余 94张)
接下来,我们使用 jmeter 发送 94 * 2 个抢秒杀卷的请求,测试线程的安全性,若秒杀卷出现负数,则代表超卖了
一人一单加悲观锁
对于一人一单的逻辑判断,由于下单操作无法进行更新,所以只能添加悲观锁
悲观锁:在加锁时,我们将锁加在含有事物提交的方法上,对于方法的调用,会出现没有被 spirng 管理导致事物无法提交的情况,我们需要拿到其 sping 代理对象来调用含有事物提交的方法(需要添加 aspectjweaver 依赖,并在启动类添加 @EnableAspectJAutoProxy(exposeProxy = true) 注解)
对于以上的加锁方式,若在 tomcat 集群中,会因为在不同的服务中导致安全问题出现,且步骤繁琐。我们可以通过 redis 分布式锁的方式解决
redis 分布式锁
在存入锁时,我们使用 UUID 生成 Key
redis 命令
//若没有锁就加一个过期时间为 10 s 的锁,若有锁则操作无效
set lock thread1 NX EX 10
java 代码:
//存入 redis 的key
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, 10, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
在释放锁时,我们需要判断要释放的锁和当前线程的锁是否一致,一致才释放,若不一致则直接让本该释放的锁通过 ttl 延迟释放
我们使用 lua 脚本来保证获取锁和删除锁操作的原子性,即同时成功同时失败
--对于 Luq 脚本,若有传参,key 会被放到 KEYS 数组中,其他参数被放到 ARGV 数组中
--对于释放锁的 Luq 脚本
if (redis.call('GET',KEYS[1]) == ARGV[1]) then
return redis.call('DEL',KEYS[1])
end
return 0
将 lua 脚本文件放入 resource 文件夹下(即ymal同级目录),并在锁工具类中初始化后调用
//提前将 lua 脚本中的内容读取出来,避免重复读取
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public void unLock() {
//调用 Lua 脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX+name),
ID_PREFIX + Thread.currentThread().getId()
);
}
完整的分布式锁代码:
@Component
public class RedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
//存入 redis 的key
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
//提前将 lua 脚本中的内容读取出来,避免重复读取
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public boolean tryLock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, 10, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unLock() {
//调用 Lua 脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX+name),
ID_PREFIX + Thread.currentThread().getId()
);
}
}
使用分布式锁工具完成一人一单的悲观锁设置
Mapper查询用户id和优惠卷id是否已经出现:
@Mapper
public interface VoucherOrderMapper extends BaseMapper<VoucherOrder> {
@Select("select count(*) from tb_voucher_order where user_id = #{id} and voucher_id = #{voucherId}")
int overBuy(@Param("id") Long id,@Param("voucherId") Long voucherId);
}
测试:
@Autowired
private VoucherOrderMapper voucherOrderMapper;
//测试用户多次购买
@Test
void overBuy(){
int i = voucherOrderMapper.overBuy(1L, 10L);
System.out.println(i);
}
将一人一单的判断放在库存之前,原因是 redis 的查询效率更高
@Service
public class VoucherOrderServiceImpl implements VoucherOrderService {
@Resource
private VoucherSeckillMapper voucherSeckillMapper;
//id 生成器
@Resource
private CreateOnlyId createOnlyId;
@Resource
private VoucherOrderMapper voucherOrderMapper;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result seckillVoucher(Long voucherId) {
//查询秒杀优惠卷
SeckillVoucher seckillVoucher = voucherSeckillMapper.selectById(voucherId);
//判断是否在活动时间内
if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("活动尚未开始");
}
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("活动已经结束");
}
//先判断一人一单逻辑
UserDTO user = UserHolder.getUser();
RedisLock redisLock = new RedisLock("order:"+user.getId(),stringRedisTemplate);
boolean getLock = redisLock.tryLock();//尝试获取锁
if(!getLock){
//获取锁失败,直接返回失败
return Result.fail("一个用户只能购买一单");
}
//获取锁成功
long orderKey = 0;
try {
//查询当前店铺下的卷是否已经有 user_id 了,有就返回fail
int count = voucherOrderMapper.overBuy(user.getId(), voucherId);
if(count > 0){
return Result.fail("一个用户只能购买一单");
}
//查看库存
if(seckillVoucher.getStock() < 1){
return Result.fail("库存不足");
}
//扣减库存
boolean flag = voucherSeckillMapper.updateSecStock(voucherId);
if(!flag){
return Result.fail("库存不足");
}
//创建订单
VoucherOrder voucherOrder = new VoucherOrder();
orderKey = createOnlyId.createID("order");
voucherOrder.setId(orderKey);
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(user.getId());
//将订单存入数据库
voucherOrderMapper.insert(voucherOrder);
} catch (Exception e) {
throw new RuntimeException();
} finally {
//释放锁
redisLock.unLock();
}
return Result.ok(orderKey);
}
}
测试是否一个人能否在短时间内抢到多个秒杀卷:
发送大量相同用户请求后,只减少了一个库存
订单表的记录也只有一条
不过这样的加锁方式任然有少量的线程安全问题,后续会继续优化分布式锁