事故现场:
系统引入了rocketMQ,用来对某些订单进行状态的更新操作。部署的时候2台消费实例。在测试环境中只考虑了用乐观锁来控制并发,但在生产中,乐观锁并无法保证数据的正确性。
具体的消费情况参考上图。
假定消息A和消息B 都是为了更新表中的同一行数据,更新逻辑是,拿到消息A 进行更新,拿到消息B 在更新一次。最终得到的结果为,消息B。
生产环境中,消息A\B 有可能同时上传。且同时被两个实例拿到。也同时满足更新条件,但我们要的结果应该是消息B,用消息B去更新表中的原始数据。但在并发的情况下,有可能在B 更新之后,A并不知道B 更新了,它会也更新一次,导致最终的结果错误。
解决思路:
1、使用分布式锁
分布式有很多种实现,比如redis,zookeeper。下面介绍redis先。
利用redis的原子性,每次在更新之前,set一个key,更新完成后,删除key。如果两个消息同时消费,应用只会处理其中的一个,另一个可以打回重新消费。优点是 实现起来比较快,springboot框架又比较成熟的配置,使用简单。缺点也很明显,太过依赖redis,如果redis故障,会导致整个系统出错。
2、数据库加锁。
对当前需要操作的数据加行锁,只允许一个线程进行修改,其他线程都在等待修改线程释放锁。优点是可靠性高,缺点是操作不当可能导致死锁。
思虑再三,决定使用第二种方法。
下面附上相关代码。
@Autowired @Qualifier("collapsarTransactionTemplate") private TransactionTemplate transactionTemplate;
transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { try { //logger.info("数据开始更新~ data {}", JsonUtil.toJSON(interceptorOrder)); Example example1Lock = new Example(InterceptorOrder.class); example1Lock.createCriteria().andEqualTo("id",interceptorOrder1.getId()); //interceptorOrderMapper.concurrentTest(); interceptorOrderMapper.updateByExampleSelective(interceptorOrder, example1Lock); //logger.info("数据更新完成~"); } catch (Exception e) { logger.error("操作数据库发生错误!! errMsg {}", ExceptionUtils.getStackTrace(e)); } } });
-------------------------------我是分割线--------------------------------------------------------------------------------------------------
<update id="updateOrderStatus" parameterType="com.xx.interceptor.model.InterceptorOrder"> UPDATE interceptor_order t1 , (select * from interceptor_order t2 where t2.bill_code=#{billCode,jdbcType=VARCHAR}) as b <trim prefix="set" suffixOverrides=","> <if test="billCode != null"> t1.bill_code = #{billCode,jdbcType=VARCHAR}, </if> <if test="interceptSiteCode != null"> t1.intercept_site_code = #{interceptSiteCode,jdbcType=VARCHAR}, </if> <if test="interceptSiteName != null"> t1.intercept_site_name = #{interceptSiteName,jdbcType=VARCHAR}, </if> <if test="interceptStatus != null"> t1.intercept_status = #{interceptStatus,jdbcType=INTEGER}, </if> <if test="operaSiteCode != null"> t1.opera_site_code = #{operaSiteCode,jdbcType=VARCHAR}, </if> <if test="operaSiteName != null"> t1.opera_site_name = #{operaSiteName,jdbcType=VARCHAR}, </if> <if test="operaUserId != null"> t1.opera_user_id = #{operaUserId,jdbcType=VARCHAR}, </if> <if test="operaUserCode != null"> t1.opera_user_code = #{operaUserCode,jdbcType=VARCHAR}, </if> <if test="operaUserName != null"> t1.opera_user_name = #{operaUserName,jdbcType=VARCHAR}, </if> <if test="operaTime != null"> t1.opera_time = #{operaTime,jdbcType=DATE}, </if> </trim> WHERE t1.bill_code = #{billCode,jdbcType=VARCHAR} and b.opera_time is null </update>
------------------------------------分割线---------------------------------------------------------------
UPDATE interceptor_order inner join(select '54' as id, sleep(5)) a on interceptor_order.id=a.id SET bill_code = '281050304306',intercept_site_code = '93530', intercept_site_name = '甘肃永昌',intercept_status = 3,opera_site_code = '93530',opera_site_name = '甘肃永昌', opera_user_id = '5021864',opera_user_code = '93530.012',opera_user_name = '余有东',opera_time = '2018-10-01 01:36:34' WHERE ( interceptor_order.id = 54 )
第一段代码为获取行级锁,并通过主键id进行update。
第二段代码,每次update都会查询是否满足更新条件。
第三段SQL,update并进行休眠,模拟开启事务,并获取行级锁,这个时候其他线程的update都处于等待状态。
参考:
https://blog.****.net/L_BestCoder/article/details/79298417 --乐观锁、悲观锁
https://blog.****.net/canot/article/details/53815294 mysql的事务和锁