并发下库存扣减和锁

时间:2024-02-24 15:19:18

先说场景:

物品W现在库存剩余1个,用户P1、P2同时购买,只有1人能购买成功,不允许超卖

秒杀也是类似的情况,只有1件商品,N个用户同时抢购,只有1人能抢到

这里不谈秒杀设计,不谈使用队列等使请求串行化,就谈下怎么用锁来保证数据一致性

 

常见的实现方案有以下几种:

1.代码同步, 例如使用 synchronized, lock 等同步方法

2.不查询,直接更新 update table set surplus = (surplus - buyQuantity) where id = xx and (surplus - buyQuantity) > 0

3.使用CAS, update table set  surplus = aa where id = xx and version = y

4.使用数据库锁, select xx for update

5.使用分布式锁(zookeeper, redis等)

 

下面就针对这几种方案来分析下

1.代码同步,例如使用 synchronized,lock 等同步方法

伪代码如下:

public synchronized void buy(String productName, Integer buyQuantity) {
    // 其他校验...
    // 校验剩余数量
    Product product  = 从数据库查询出记录;
    if (product.getSurplus < buyQuantity) {
        return "库存不足";
    }

    // set新的剩余数量
    product.setSurplus(product.getSurplus() - quantity);
    // 更新数据库
    update(product);
    // 记录日志...
    // 其他业务...
}

 

先说下这个方案的前提配置:

1.使用spring 声明式事务管理

2.事务传播机制使用默认的(PROPAGATION_REQUIRED)

3.项目分层为controller-service-dao 3层,事务管理在service层

 

这个方案不可行,主要是因为以下几点:

1.synchronized的作用范围是单个jvm实例,如果做了集群,分布式就没用了

2.synchronized是作用在对象实例上的,如果不是单例,则多个实例间不会同步(这个一般用spring管理bean,默认就是单例)

3.单个jvm时,synchronized也不能保证多个数据库事务的隔离性。这与代码中的事务传播级别,数据库的事务隔离级别,加锁时机等相关

 

3.1.先说下隔离级别,常用的是 Read Committed 和 Repeatable Read,另外2种不常用就不说了

3.1.1.RR(Repeatable Read)级别,mysql默认的是RR,事务开启后,不会读取到其他事务提交的数据

根据前面的前提,我们知道在调用buy方法时会开启事务

假设现在有线程T1,T2同时执行buy方法,假设T1先执行,T2等待

spring的事务开启和提交等是通过aop(代理)实现的,所以执行buy方法前,就会开启事务

这时候T1,T2是两个事务,当T1执行完后,T2执行,读取不到T1提交的数据,所以会出问题

 

3.1.2.RC(Read Committed)级别,事务开启后,可以读取到其他事务提交的数据

看起来这个级别可以解决上面的问题。T2执行时,可以读取到T1提交的结果

但是问题是,T2执行的时候,T1的事务提交了吗?

事务和锁的流程如下

1.开启事务(aop)

2.加锁(进入synchronized方法)

3.释放锁(退出synchronized方法)

4.提交事务(aop)

可以看出是先释放锁,再提交事务,所以T2执行查询,可能还是未读到T1提交的数据,还会出问题

 

3.2.根据3.1中的问题,发现主要矛盾是事务开启和提交的时机与加锁解锁时机不一致,有小伙伴们可能就想到了解决方案.

3.2.1.在事务开启前加锁,事务提交后解锁

确实是可以,这相当于事务串行化,抛开性能不谈,来谈谈怎么实现

如果使用默认的事务传播机制,那么要保证事务开启前加锁,事务提交后解锁,就需要把加锁,解锁放在controller层

这样就有个潜在问题,所有操作库存的方法,都要加锁,而且要是同一把锁,写起来挺累的

而且这样还是不能跨jvm

 

3.2.2.将查询库存,扣减库存这2步操作,单独提取个方法,单独使用事务,并且事务隔离级别设置为RC

这个其实和上面的3.2.1异曲同工,最终都是将加解锁放在了事务开启提交外层

比较而言优点是入口少了controller不用处理

缺点除了上面的不能跨jvm,还有就是 单独的这个方法,需要放到另外的service类中

因为使用spring,同一个bean的内部方法调用,是不会被再次代理的,所以配置的单独事务等需要放到另外的 service bean 中

 

2.不查询,直接更新

看完第一种方案,小伙伴就说了,说的那么复杂,这么多问题,就是因为查询的数据不是最新的吗?

我们不查询,接更新不就行啦

伪代码如下:

public synchronized void buy(String productName, Integer buyQuantity) {
    // 其他校验...
    int 影响行数 = update table set surplus = (surplus - buyQuantity) where id = 1 ;
    if (result < 0) {
        return "库存不足";
    }
    // 记录日志...
    // 其他业务...
}

 

 测试后发现库存变成-1了,继续完善下

public synchronized void buy(String productName, Integer buyQuantity) {
    // 其他校验...
    int 影响行数 = update table set surplus = (surplus - buyQuantity) where id = 1 and (surplus - buyQuantity) > 0 ;
    if (result < 0) {
        return "库存不足";
    }
    // 记录日志...
    // 其他业务...
}

 

测试后,功能OK;

这样确实可以实现,不过有一些其他问题:

1.不具备通用性,例如add操作

2.库存操作一般要记录操作前后的数量等,这样没法记录

3.其他...

但是根据这个方案,引出方案3
 

3.使用CAS,update table set surplus = aa where id = xx and yy = y

CAS是指compare/check and swap/set 意思都差不多,不必太纠结是哪个单词

我们将上面的sql修改一下:

int 影响行数 = update table set surplus = newQuantity where id = 1 and surplus = oldQuantity ;

这样,线程T1执行完后,线程T2去更新,影响行数=0,则说明数据被更新,重新查询判断执行。

伪代码如下:

public void buy(String productName, Integer buyQuantity) {
    // 其他校验...
    Product product = getByDB(productName);
    int 影响行数 = update table set surplus = (surplus - buyQuantity) where id = 1 and surplus = 查询的剩余数量 ;
    while (result == 0) {
        product = getByDB(productName);
        if (查询的剩余数量 > buyQuantity) {
            影响行数 = update table set surplus = (surplus - buyQuantity) where id = 1 and surplus = 查询的剩余数量 ;
        } else {
            return "库存不足";
        }
    }
    // 记录日志...
    // 其他业务...
}

 

看到重新查询几个字,小伙伴们应该就又想到事务隔离级别问题了

上面代码中的getByDB方法,必须单独事务(注意同一个bean内单独事务不生效哦),而且数据库的事务隔离级别必须是RC,

否则上面的代码就会是死循环了。

 

上面的方案,可能会出现一个CAS中经典问题,ABA的问题:

ABA是指:

线程T1 查询,库存剩余  100, 线程T2 查询,库存剩余  100

线程T1 执行sub update t set surplus = 100 - 10 where id = x and surplus = 100;

线程T3 查询, 库存剩余 90

线程T3 执行add update t set surplus = 90 + 10 where id = x and surplus = 90;

线程T2 执行sub update t set surplus = 100 - 10 where id = x and surplus = 100;

 这里线程T2执行的时候,库存的100已经不是查询到的100了,但是对于这个业务是不影响的。

 

一般的设计中CAS会使用version来控制

update t set surplus = 90 ,version = version + 1 where id = x and version = oldVersion ;

这样,每次更新version在原基础上+1,就可以了。

使用CAS要注意几点,

1.失败重试次数,是否需要限制

2.失败重试对用户是透明的

 

4.使用数据库锁,select xx for update

方案3中的cas,是乐观锁的实现,而select for udpate 则是悲观锁,在查询数据的时候,就将数据锁住

伪代码如下:

public void buy(String productName, Integer buyQuantity) {
    // 其他校验...
    Product product = select * from table where name = productName for update;
    if (查询的剩余数量 > buyQuantity) {
        影响行数 = update table set surplus = (surplus - buyQuantity) where name = productName ;
    } else {
        return "库存不足";
    }
    // 记录日志...
    // 其他业务...
}

 

线程T1 进行sub,查询库存剩余 100

线程T2 进行sub,这时候,线程T1事务还未提交,线程T2阻塞,直到线程T1事务提交或回滚才能查询出结果

所以线程T2查询出的一定是最新的数据,相当于事务串行化了,就解决了数据一致性问题

 

对于select for update,需要注意的有2点

1.统一入口:所有库存操作都需要统一使用 select for update,这样才会阻塞,如果另外一个方法还是普通的select,是不会被阻塞的

2.加锁顺序:如果有多个锁,那么加锁顺序要一致,否则会出现死锁

 

5.使用分布式锁(zookeeper,redis等)

使用分布式锁,原理和方案1种的synchronized是一样的,只不过synchronized的flag只有jvm进程内可见,而分布式锁的flag则是全局可见,方案4种的 select for update 的flag也是全局可见

分布式锁的实现方案有很多:基于redis,基于zookeeper,基于数据库等等。下一篇博客将介绍 基于redis的分布式锁

需要注意,使用分布式锁和synchronized锁有同样的问题,就是锁和事务的顺序,这个在方案1里面已经讲过,不再重复

 

做个简单总结

方案1:synchronized等jvm内部锁不适合用来保证数据库数据一致性,不能跨jvm

方案2:不具备通用性,不能记录操作前后日志

方案3:推荐使用,但是如果数据竞争激烈,则自动重试次数会急剧上升,需要注意

方案4:推荐使用,最简单的方案,但是如果事务过大,会有性能问题;操作不当,会有死锁问题

方案5:和方案1类似,只是能跨jvm

 

原文:

https://blog.csdn.net/qq315737546/article/details/76850173