SpringBoot整合nacos + seata实现分布式事务

时间:2022-12-17 08:55:20

准备工作

安装nacos

安装seata-server

订单服务RM实现

创建订单服务的DB

DROP TABLE IF EXISTS `tab_order`;
CREATE TABLE `tab_order`  (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `user_id` bigint(11) NULL DEFAULT NULL COMMENT '用户ID',
  `product_id` bigint(11) NULL DEFAULT NULL COMMENT '产品ID',
  `count` int(11) NULL DEFAULT NULL COMMENT '数量',
  `money` decimal(11, 0) NULL DEFAULT NULL COMMENT '金额',
  `status` int(1) NULL DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完成',
  PRIMARY KEY (`id`) USING BTREE
);

创建库存服务的DB

DROP TABLE IF EXISTS `tab_storage`;
CREATE TABLE `tab_storage`  (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `product_id` bigint(11) NULL DEFAULT NULL COMMENT '产品id',
  `total` int(11) NULL DEFAULT NULL COMMENT '总库存',
  `used` int(11) NULL DEFAULT NULL COMMENT '已有库存',
  PRIMARY KEY (`id`) USING BTREE
) ;
INSERT INTO `tab_storage` VALUES (1, 1, 95, 5);
INSERT INTO `tab_storage` VALUES (2, 2, 91, 9);

各数据库加入undo_log

DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime(0) NOT NULL,
  `log_modified` datetime(0) NOT NULL,
  `ext` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
);

创建一个名为seata-demo的springBoot父工程添加maven依赖


  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.2.RELEASE</version>
  </parent>

  <properties>
    <springboot.verison>2.4.2.RELEASE</springboot.verison>
    <java.version>1.8</java.version>
    <mybatis.version>2.1.5</mybatis.version>
    <tk-mapper.version>4.1.5</tk-mapper.version>
    <seata.version>1.3.0</seata.version>
  </properties>

  <dependencyManagement>
    <dependencies>
      <!--Mybatis通用Mapper-->
      <dependency>
        <groupId>tk.mybatis</groupId>
        <artifactId>mapper-spring-boot-starter</artifactId>
        <version>${mybatis.version}</version>
      </dependency>
      <dependency>
        <groupId>tk.mybatis</groupId>
        <artifactId>mapper</artifactId>
        <version>${tk-mapper.version}</version>
      </dependency>

      <!--SpringCloud-->
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>Hoxton.SR9</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>

      <!--Spring Alibaba Cloud-->
      <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-alibaba-dependencies</artifactId>
        <version>2.2.1.RELEASE</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

在seata-demo工程下创建订单工程order-service

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>

    <!--nacos注册中心和配置中心-->
    <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    </dependency>
    <!--Mybatis通用Mapper-->
    <dependency>
      <groupId>tk.mybatis</groupId>
      <artifactId>mapper-spring-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>tk.mybatis</groupId>
      <artifactId>mapper</artifactId>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
    </dependency>

    <!--openfeign-->
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>
    <dependency>
      <groupId>io.github.openfeign</groupId>
      <artifactId>feign-okhttp</artifactId>
      <version>10.2.3</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
      <exclusions>
        <exclusion>
          <groupId>io.seata</groupId>
          <artifactId>seata-spring-boot-starter</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>io.seata</groupId>
      <artifactId>seata-spring-boot-starter</artifactId>
      <version>1.3.0</version>
    </dependency>

  </dependencies>

启动类

@SpringBootApplication
@EnableDiscoveryClient
public class OrderApplication {
    public static void main( String[] args ) {
        SpringApplication.run(OrderApplication.class,args);
    }
}

application.yml

server:
  port: 6770

bootstrap.yml

spring:
  application:
    name: order-service
  profiles:
    active: dev

bootstrap-dev.yml

spring:
  cloud:
    nacos:
      discovery:
        server-addr: 服务器IP:8848
        namespace: dev
      config:
        server-addr: 服务器IP:8848
        file-extension: yml
        namespace: dev
logging:
  level:
    com.alibaba.nacos.client.config.impl: WARN

在nacos新建dev命名空间

order-service.yml

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/my_order?serverTimezone=UTC
    username: root
    password: etoak

seata:
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: my_test_tx_group #事务分组需要与seata-server端保持一致 每个微服务可以独立命名 也可以使用相同
  enable-auto-data-source-proxy: true
  data-source-proxy-mode: AT #使用AT模式
  registry:
    type: nacos
    nacos:
      group: SEATA_GROUP
      server-addr: 服务器IP:8848
      namespace: seata
  config:
    type: nacos
    nacos:
      serverAddr: 服务器IP:8848
      namespace: seata
      group: SEATA_GROUP
      dataId: seataServer.properties
       # 在安装seata-serveer 的时候 我们通过修改seata/script/config-center config.txt来配置 seata数据库 事务分组等信息 并通过 sh nocos-config.sh推送到nacos配置
       #seata-server高版本可以使用一个配置文件配置seata-server服务端配置

seataServer.properties

#For details about configuration items, see https://seata.io/zh-cn/docs/user/configurations.html
#Transport configuration, for client and server
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none

#Transaction routing rules configuration, only for the client
service.vgroupMapping.my_test_tx_group=default
#If you use a registry, you can ignore it
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false

#Transaction rule configuration, only for the client
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
#For TCC transaction mode
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h

#Log rule configuration, for client and server
log.exceptionRate=100

#Transaction storage configuration, only for the server. The file, db, and redis configuration values are optional.
store.mode=db
store.lock.mode=db
store.session.mode=db
#Used for password encryption
store.publicKey=


#These configurations are required if the `store mode` is `db`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `db`, you can remove the configuration block.
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://localhost:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=etoak
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000

#Transaction rule configuration, only for the server
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.distributedLockExpireTime=10000
server.xaerNotaRetryTimeout=60000
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=false
server.enableParallelRequestHandle=false

#Metrics configuration, only for the server
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

Order实体

@Table(name = "tab_order")
@Data
@Accessors(chain = true)
public class Order {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Long userId;

    private Long productId;

    private int count;

    private BigDecimal money;

    private int status;
}

orderController

@RestController
@RequestMapping(value = "/order")
public class OrderController {

    @Resource
    private OrderService orderService;


    @PostMapping("/create")
    public Boolean createOrder(long userId , long productId){
        Order order = new Order();
        order.setCount(1)
                .setMoney(BigDecimal.valueOf(88))
                .setProductId(productId)
                .setUserId(userId)
                .setStatus(0);
        return orderService.create(order);
    }

}

OrderService

@Slf4j
@Service
public class OrderService {

    @Resource
    private OrderMapper orderMapper;

    public Boolean create(Order order) {
        log.info("创建订单开始");
        int index = orderMapper.insert(order);
        log.info("创建订单结束");
        return index > 0;
    }
}

OrderMapper

@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}

库存服务RM实现

在seata-demo下新建一个名为stock-service的服务

pom文件与自启动类注解与order服务一致

yml只需要修改服务端口号和服务名以及mysql连接数据库即可

Stock实体类
@Table(name = "tab_order")
@Data
@Accessors(chain = true)
public class Stock {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Long total;

    private Long productId;

    private Long used;

}

StockController

@RestController
@RequestMapping("/stock")
public class StockController {

    @Resource
    private StockService stockService;

    @PostMapping("/change")
    public Boolean changeStorage(long productId , int used)  {
        return stockService.updateUseNum(productId , used);
    }
}

StockService

@Slf4j
@Service
public class StockService {

    @Resource
    private StockMapper storageMapper;
    
    public boolean updateUseNum(long productId , long used) {

        int i = 1/0;

        int index = storageMapper.updateUsed(productId, used);
        return index > 0;
    }
}

StockMapper

@Mapper
public interface StockMapper extends BaseMapper<Stock> {

    @Update("update tab_storage set total = total - #{currentUsed} , used = used + #{currentUsed} where product_id = #{productId}")
    int updateUsed(@Param("productId") long productId , @Param("currentUsed") long currentUsed);

}

搭建TM业务中台biz-service

在seata-demo服务下新建一个biz-service服务

pom

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>

    <!--nacos注册中心和配置中心-->
    <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    </dependency>


    <!--openfeign-->
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>
    <dependency>
      <groupId>io.github.openfeign</groupId>
      <artifactId>feign-okhttp</artifactId>
      <version>10.2.3</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
      <exclusions>
        <exclusion>
          <groupId>io.seata</groupId>
          <artifactId>seata-spring-boot-starter</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>io.seata</groupId>
      <artifactId>seata-spring-boot-starter</artifactId>
      <version>1.3.0</version>
    </dependency>
  </dependencies>

主启动类多了一个@EnableFeignClients

@EnableFeignClients
@EnableDiscoveryClient
@SpringBootApplication
public class BizApplication {
    public static void main( String[] args ) {
        SpringApplication.run(BizApplication.class,args);
    }
}

yml需要修改端口号服务名与mysql数据库

BizController
seata开启全局事务只需要 加上@GlobalTransactional即可

@RestController
public class BizController {

    @Resource
    private OrderApi orderApi;

    @Resource
    private StockApi stockApi;

    @GetMapping("buy")
    @GlobalTransactional
    public String buy(long userId , long productId){
        orderApi.create(userId , productId);
        stockApi.changeStorage(userId , 1);
        return "ok";
    }
}

使用Feign调用Order和Stock服务

OrderAPI

@FeignClient(name = "order-service",path = "order")
@Component
public interface OrderApi {
    @PostMapping("create")
    Boolean create(@RequestParam("userId") long userId ,@RequestParam("productId") long productId);

}

StockApi

@FeignClient(name = "stock-service",path = "stock")
@Component
public interface StockApi {

    @PostMapping("change")
    Boolean changeStorage(@RequestParam("productId") long productId , @RequestParam("used")  int used);
}