一次并发的update引发的血案 ----rocketMQ 多实例消费导致的错误。

时间:2024-05-23 16:06:42

事故现场:

      系统引入了rocketMQ,用来对某些订单进行状态的更新操作。部署的时候2台消费实例。在测试环境中只考虑了用乐观锁来控制并发,但在生产中,乐观锁并无法保证数据的正确性。

一次并发的update引发的血案 ----rocketMQ 多实例消费导致的错误。

 

具体的消费情况参考上图。

假定消息A和消息B 都是为了更新表中的同一行数据,更新逻辑是,拿到消息A 进行更新,拿到消息B 在更新一次。最终得到的结果为,消息B。

生产环境中,消息A\B 有可能同时上传。且同时被两个实例拿到。也同时满足更新条件,但我们要的结果应该是消息B,用消息B去更新表中的原始数据。但在并发的情况下,有可能在B 更新之后,A并不知道B 更新了,它会也更新一次,导致最终的结果错误。

 

解决思路:

      1、使用分布式锁

分布式有很多种实现,比如redis,zookeeper。下面介绍redis先。

利用redis的原子性,每次在更新之前,set一个key,更新完成后,删除key。如果两个消息同时消费,应用只会处理其中的一个,另一个可以打回重新消费。优点是 实现起来比较快,springboot框架又比较成熟的配置,使用简单。缺点也很明显,太过依赖redis,如果redis故障,会导致整个系统出错。

      2、数据库加锁。

对当前需要操作的数据加行锁,只允许一个线程进行修改,其他线程都在等待修改线程释放锁。优点是可靠性高,缺点是操作不当可能导致死锁。

 

思虑再三,决定使用第二种方法。

下面附上相关代码。

 

@Autowired
@Qualifier("collapsarTransactionTemplate")
private TransactionTemplate transactionTemplate;
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
    @Override
    protected void doInTransactionWithoutResult(TransactionStatus status) {
        try {
            //logger.info("数据开始更新~ data {}", JsonUtil.toJSON(interceptorOrder));
            Example example1Lock = new Example(InterceptorOrder.class);
            example1Lock.createCriteria().andEqualTo("id",interceptorOrder1.getId());
            //interceptorOrderMapper.concurrentTest();
            interceptorOrderMapper.updateByExampleSelective(interceptorOrder, example1Lock);
            //logger.info("数据更新完成~");
        } catch (Exception e) {
            logger.error("操作数据库发生错误!! errMsg {}", ExceptionUtils.getStackTrace(e));
        }
    }
});

-------------------------------我是分割线--------------------------------------------------------------------------------------------------

<update id="updateOrderStatus" parameterType="com.xx.interceptor.model.InterceptorOrder">
  UPDATE interceptor_order t1 ,
  (select * from interceptor_order t2 where t2.bill_code=#{billCode,jdbcType=VARCHAR}) as b
    <trim prefix="set" suffixOverrides=",">
    <if test="billCode != null">
        t1.bill_code = #{billCode,jdbcType=VARCHAR},
    </if>
    <if test="interceptSiteCode != null">
        t1.intercept_site_code = #{interceptSiteCode,jdbcType=VARCHAR},
    </if>
    <if test="interceptSiteName != null">
        t1.intercept_site_name = #{interceptSiteName,jdbcType=VARCHAR},
    </if>
    <if test="interceptStatus != null">
        t1.intercept_status = #{interceptStatus,jdbcType=INTEGER},
    </if>
    <if test="operaSiteCode != null">
        t1.opera_site_code = #{operaSiteCode,jdbcType=VARCHAR},
    </if>
    <if test="operaSiteName != null">
        t1.opera_site_name = #{operaSiteName,jdbcType=VARCHAR},
    </if>
    <if test="operaUserId != null">
        t1.opera_user_id = #{operaUserId,jdbcType=VARCHAR},
    </if>
    <if test="operaUserCode != null">
        t1.opera_user_code = #{operaUserCode,jdbcType=VARCHAR},
    </if>
    <if test="operaUserName != null">
        t1.opera_user_name = #{operaUserName,jdbcType=VARCHAR},
    </if>
    <if test="operaTime != null">
        t1.opera_time = #{operaTime,jdbcType=DATE},
    </if>
    </trim>
  WHERE  t1.bill_code = #{billCode,jdbcType=VARCHAR} and b.opera_time is null
</update>

------------------------------------分割线---------------------------------------------------------------

UPDATE interceptor_order inner join(select '54' as id, sleep(5)) a on interceptor_order.id=a.id   SET bill_code = '281050304306',intercept_site_code = '93530',
intercept_site_name = '甘肃永昌',intercept_status = 3,opera_site_code = '93530',opera_site_name = '甘肃永昌',
opera_user_id = '5021864',opera_user_code = '93530.012',opera_user_name = '余有东',opera_time = '2018-10-01 01:36:34' WHERE ( interceptor_order.id = 54 )

 

第一段代码为获取行级锁,并通过主键id进行update。

第二段代码,每次update都会查询是否满足更新条件。

第三段SQL,update并进行休眠,模拟开启事务,并获取行级锁,这个时候其他线程的update都处于等待状态。

 

 

参考:

https://blog.****.net/L_BestCoder/article/details/79298417 --乐观锁、悲观锁

https://blog.****.net/canot/article/details/53815294  mysql的事务和锁