分布式事务的几种实现(阿里seata)

时间:2021-07-26 01:19:11


seata的部署和配置

Seata中文官网

TCC模式的实现

通过阿里的Seata去运用TCC模式

一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:

  • 一阶段 prepare 行为
  • 二阶段 commit 或 rollback 行为

定义TCC接口

由于我们使用的是SpringCloud+Feign,Feign的调用基于http,因此此处我们使用LocalTCC便可。值得注意的是,@LocalTCC一定需要注解在接口上,此接口可以是寻常的业务接口,只要实现了TCC的两阶段提交对应方法便可。

  • @LocalTCC 适用于SpringCloud+Feign模式下的TCC
  • @TwoPhaseBusinessAction 注解try方法,其中name为当前tcc方法的bean名称,写方法名便可(记得全局唯一),commitMethod指向提交方法,rollbackMethod指向事务回滚方法。指定好三个方法之后,seata会根据全局事务的成功或失败,去帮我们自动调用提交方法或者回滚方法。
  • @BusinessActionContextParameter 注解可以将参数传递到二阶段(commitMethod/rollbackMethod)的方法。
    BusinessActionContext 便是指TCC事务上下文

下面根据模型定义Service和实现,定义三个方法,prepare行为,commit和rollback方法。

@LocalTCC
public interface TccService {
    /**
     * 定义两阶段提交prepare方法
     * name = 该tcc的bean名称,全局唯一
     * commitMethod = commit 为二阶段确认方法
     * rollbackMethod = rollback 为二阶段取消方法
     * BusinessActionContextParameter注解 传递参数到二阶段中
     *
     * @return String
     */
    @TwoPhaseBusinessAction(name = "insert", commitMethod = "commitTcc", rollbackMethod = "cancel")
    String insert(Long productId,Integer count);


    /**
     * commit方法、可以另命名,但要保证与commitMethod一致
     * context可以传递try方法的参数
     *
     * @param context 上下文
     * @return boolean
     */
    boolean commitTcc(BusinessActionContext context);
    /**
     * 二阶段rollback方法
     *
     * @param context 上下文
     * @return boolean
     */
    boolean cancel(BusinessActionContext context);



}
@Slf4j
@Service
public class TccServiceImpl implements TccService {

    @Autowired
    private ResourceStorageService resourceStorageService;

    /**
     * tcc服务t(try)方法
     * 根据实际业务场景选择实际业务执行逻辑或者资源预留逻辑
     *
     * @return String
     */
    @Override
    @PostMapping("/tcc-insert")
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
    public String insert(Long productId,Integer count) {
        log.info("xid = " + RootContext.getXID());
        resourceStorageService.decrease(productId, count);
        //放开以下注解抛出异常
//        throw new RuntimeException("服务tcc测试回滚");
        return "success";
    }

    /**
     * tcc服务 confirm方法
     * 若一阶段采用资源预留,在二阶段确认时要提交预留的资源
     *
     * @param context 上下文
     * @return boolean
     */
    @Override
    public boolean commitTcc(BusinessActionContext context) {
        log.info("xid = " + context.getXid() + "提交成功");
        // 若try成功,一阶段资源预留,这里则要提交资源
        return true;
    }

    /**
     * tcc 服务 cancel方法
     *
     * @param context 上下文
     * @return boolean
     */
    @Override
    public boolean cancel(BusinessActionContext context) {
        //todo 这里写回滚操作
        System.out.println("please manually rollback this data:" + context.getActionContext("params"));
        return true;
    }

}

提供调用方法:

@Api(tags = "资源库存")
@RestController
@RequestMapping("/storage")
public class StorageController {
    

    @Autowired
    private TccService tccService;

    /**
     * 扣减库存
     */
    @RequestMapping("/decrease")
    public CommonResult decrease(Long productId, Integer count) {
        
        tccService.insert(productId,count);
        return new CommonResult("扣减库存成功!",200);
    }
}

下面通过feign调用,这里写调用方:

@Api(tags = "分布式-接口测试demo")
@RestController
@RequestMapping(value = "/v1/distributed")
public class DistributedController {


    /**
     * 库存服务,减少库存
     */
    @Resource
    private FeignStorageService feignStorageService;

    /**
     * 增加一条主题内容
     */
    @Resource
    private TopicService topicService;

    @GetMapping("/transaction")
    @ApiOperation(value = "分布式事务seata测试")
    @GlobalTransactional(name = "fsp-tra", rollbackFor = Exception.class)
    public CommonResult transaction() {
        //减少库存
        feignStorageService.decrease(1L, 1);
        //增加一条topic
        TopicDO topicDO = new TopicDO();
        topicDO.setName("购物热");
        topicService.save(topicDO);
        //执行出错
        int a = 1 / 0;
        return CommonResult.success("");
    }

}

库存的feign调用客户端

/**
 * @author haitao.li
 * @description: 库存
 * @date 2021/10/25 10:26
 */
@FeignClient(value = "tcly-storage")
public interface FeignStorageService {

    /**
     * 扣减库存
     * @param productId 查询id
     * @param count 产品数量
     * @return 扣除结果
     */
    @RequestMapping("/storage/decrease")
    CommonResult<Object> decrease(@RequestParam Long productId,@RequestParam  Integer count);
}

上面的代码,调用的时候,通过@GlobalTransactional(name = “fsp-tra”, rollbackFor = Exception.class)进行全局事务管理。

下面根据以上代码进行测试。

调用到最后的时候,一行代码出错: int a = 1 / 0; 正常情况 下,扣减库存和插入topic都会失败

执行前的storage:

分布式事务的几种实现(阿里seata)

执行前的topic

分布式事务的几种实现(阿里seata)

执行后结果:

topic没有添加,但是库存扣减成功了(因为我这里的cancel方法里没有做实际的回滚操作,看下面,如果实际执行了cancel则说明TCC正常)。

下面是库存服务 中的两阶段结果:

分布式事务的几种实现(阿里seata)

分布式事务的几种实现(阿里seata)

全局事务执行成功,并且库存服务的两阶段模型执行了第二阶段的cancel操作。

如果开启了@LocalTCC,那么必须通过cancel方法进行回滚。

XA/AT(2PC)

一个下订单的例子,进行扣减库存,生成订单,余额扣除。

分布式事务的几种实现(阿里seata)

通过业务模块business进行分布式 事务全局管控,storage,account,order三个微服务之间通过feign调用并处理事务。

样例场景是 Seata 经典的,涉及库存、订单、账户 3 个微服务的商品订购业务。

在样例中,上层编程模型与 AT 模式完全相同。只需要修改数据源代理,即可实现 XA 模式与 AT 模式之间的切换。

@Bean("dataSource")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        // DataSourceProxy for AT mode
        // return new DataSourceProxy(druidDataSource);

        // DataSourceProxyXA for XA mode
        return new DataSourceProxyXA(druidDataSource);
    }

完整代码(下载后根据以下步骤):

下载demo:seata-xa

准备工作
  1. 执行sql/all_in_one.sql
  2. 下载最新版本的 Seata Sever
  3. 解压并启动 Seata server
unzip seata-server-xxx.zip

cd distribution
sh ./bin/seata-server.sh 8091 file
  1. 启动 AccountXA, OrderXA, StorageXA, BusinessXA 服务
测试
  • 无错误成功提交
curl http://127.0.0.1:8084/purchase

具体调用参数请结合 BusinessController 的代码。

数据初始化逻辑,参见 BusinessService#initData() 方法。

基于初始化数据,和默认的调用逻辑,purchase 将可以被成功调用 3 次。

每次账户余额扣减 3000,由最初的 10000 减少到 1000。

第 4 次调用,因为账户余额不足,purchase 调用将失败。相应的:库存、订单、账户都回滚。

XA 模式与 AT 模式

只要切换数据源代理类型,该样例即可在 XA 模式和 AT 模式之间切换。

DataSourceConfiguration

XA 模式使用 DataSourceProxyXA

public class DataSourceProxy {

    @Bean("dataSourceProxy")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        // DataSourceProxyXA for XA mode
        return new DataSourceProxyXA(druidDataSource);
    }
}

AT 模式使用 DataSourceProxy

public class DataSourceProxy {

    @Bean("dataSourceProxy")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        // DataSourceProxyXA for AT mode
        return new DataSourceProxy(druidDataSource);
    }
}

当然,AT 模式需要在数据库中建立 undo_log 表。(XA 模式是不需要这个表的)

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

saga模式

saga模式