环境准备
1、开发工具:SpringToolSuite 4
2、Spring-Cloud版本:Greenwich.SR2
3、Seata1.3
4、Mybatis-Plus
5、Mysql
Seata简介
Seata 是一款由阿里巴巴中间件团队发起了开源项目,致力于解决微服务架构下提供高性能和简单易用的分布式事务。支持AT、TCC、SAGA、XA分布式模型,对微服务框架也有良好的支持。
如图上所示,Seata 中有三大模块,分别是 TM、RM 与 TC, 其中 TM、 RM 是作为 Seata 的客户端与业务系统集成在一起,TC 作为 Seata 的服务端独立部署。
- TC - 事务协调者 维护全局和分支事务的状态,驱动全局事务提交或回滚。
- TM - 事务管理器定义全局事务的范围:开始全局事务、提交或回滚全局事务。
- RM - 资源管理器管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
以上是大概的说明,具体的在其它文章再去介绍了,最近更新了Seata 1.3.0 的版本,具体feature更新如下:
支持 MySQL 多主键
支持 Redis 存储模式
Saga 流程设计器 Groovy Script Task
Server 支持 HikariCP 数据源
支持根据连续错误数动态升降级
支持事务注解类标注
协议新增 LZ4 压缩支持
Server 支持版本检查
支持 Oracle 同一实例下不同用户的事务
支持使用 Nacos 注册中心配置 group 属性
支持 ACM 配置中心
支持 update 操作回滚所有数据列和更新列
StateHandlerInterceptor 和 StateRouterInterceptor 支持 SPI
Server 鉴权支持 SPI
TCC 模式支持 Dubbo 和 Sofa-RPC 注解调用
Saga 模式支持 jackson parser
增加 zookeeper 序列化支持
支持 array, datalink 等 JDBC 类型
xid 生成支持雪花算法
支持配置缓存,去除配置中心强依赖
下面来讲讲怎么与微服务架构SpringCloud、Eureka、Mybatis-Plus一起应用于项目中。
项目搭建
Seata服务端安装
下载
- Seata 1.3.0的下载地址https://github.com/seata/(下载有点慢)
- 也可以在CSDN的空间中下载,地址如下:下载(下载速度快,只有windows的版本)
解压文件
配置文件修改
Seata配置需修改几个配置文件,分别为file.conf、registry.conf
-
file.conf
下把mode修改为用db的方式存储
配置db的地址,修改为自己要用的数据库地址,支持mysql/oracle/postgresql/h2/oceanbase,同时也支持高性能的key-value数据库Redis
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
datasource = "druid"
## mysql/oracle/postgresql/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3307/seata"
user = "root"
password = ""
minConn = 5
maxConn = 30
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
maxWait = 5000
}
-
registry.conf
文件
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka" #支持多种注册中心,这里选择的是Eureka
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = ""
password = ""
}
eureka {
serviceUrl = "http://localhost:8761/eureka" #注册中心的地址
application = "seata-eureka" ## 注册到注册中心应用的名称,这里要注册客户端的配置需和这里的名称一致
weight = "1" #权重为默认值
}
redis {
serverAddr = "localhost:6379"
db = 0
password = ""
cluster = "default"
timeout = 0
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
appId = "seata-server"
apolloMeta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file { ##默认以文件的方式存储配置
name = "file.conf"
}
}
以上Seata已配置好,后面运行的时再一起启动测试了,启动运行bin/seata-server.bat
数据库准备
Seata数据库初始化
数据库中需要创建三张表:global_table,branch_table,lock_table
- 在项目的目录中打文件夹为db的目录,下面有SQL的初始化脚本
- 同时也可以去官网下载SQL脚本文件,下载位置https://github.com/seata/seata/tree/develop/script/server/db
项目数据库初始化
在每个子项目的db文件中用于初始化业务客户端的数据库
说明:每个业务客户端都必须有undo_log表,undo_log是用于记录事务前后的数据镜像
以上的准备工作和数据初始化工作就告一段落。
项目目录
springcloud-eureka-seata -- 项目主目录
├── eureka-server -- eureka服务
├── order-provider -- 订单服务
└── seata-storage-provider -- 库存服务
Eureka项目
用Maven创建项目结构,用pom的文件创建项目
- pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocatinotallow="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>springcloud-eureka-seata</groupId>
<artifactId>springcloud-eureka-seata</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<description>SpringCloud+Eureka+Mybatis-Plus+Seata整合</description>
<modules>
<module>eureka-server</module>
<module>order-provider</module>
<module>storage-provider</module>
</modules>
<properties>
<spring-boot.version>2.1.3.RELEASE</spring-boot.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-cloud.version>Greenwich.SR2</spring-cloud.version>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<druid.version>1.1.9</druid.version>
<seata.version>1.3.0</seata.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
<!-- mybatis-plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>2.1.0.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>${seata.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
- EurekaServerApplication
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
/**
* EurekaServer 启动入口
* @author thomashe
* @date 2020年7月17日
*/
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(EurekaServerApplication.class, args);
}
}
Seata业务客户端项目搭建
库存服务提供者(storage-provider)
创建项目目录
配置文件
file.conf、registry.conf
以上两个文件必须在业务客户端中存在,并放在resources
目录下,如果业务客户端中没有这两个文件会出现读取错误。
- file.conf
文件中注意service下的内容,vgroupMapping.sos-create-order = "seata-eureka" #sos-create-order要与properties中的 配置cloud:alibaba:seata:tx-service-group: sos-create-order 一致value中的"seata-eureka"要需要与服务端文件registry.conf中的application描述一致(也可以去官网上去参考)
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
vgroupMapping.sos-create-order = "seata-eureka"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
-
registry.conf
我们这里用eureka作注册中心,所以,只用修改registry{}中是注册中心相关配置,config{}中是配置中心相关配置。seata中,注册中心和配置中心是分开实现的,在项目中起的作用也是不同的:
```bash
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = ""
password = ""
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "seata-eureka"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = 0
password = ""
cluster = "default"
timeout = 0
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
appId = "seata-server"
apolloMeta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
application.yml
# eureka client
eureka:
instance:
hostname: localhost
prefer-ip-address: true
client:
serviceUrl:
defaultZone: http://${eureka.instance.hostname}:8761/eureka/
logging:
level:
io.seata: info
com.baomidou: info
# MyBatis
mybatis-plus:
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapper-locations: classpath*:mapper/**/*Mapper.xml
# 搜索指定包别名
typeAliasesPackage: com.bitter.**.domain
configuration:
map-underscore-to-camel-case: true
cache-enabled: true
call-setters-on-nulls: true
jdbc-type-for-null: 'null'
server:
port: 8182
spring:
application:
name: storage-server
cloud:
alibaba:
seata:
tx-service-group: sos-create-order
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3307/seata-storage?&characterEncoding=utf8&useSSL=true&serverTimezone=UTC
username: root
password:
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.FeignClient;
/**
* 库存提供者启动入口
*
* @author thomashe
* @date 2020年7月17日
*/
@FeignClient
@EnableEurekaClient
@MapperScan(basePackages = "com.bitter.storage.mapper")
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class }) // 取消数据源的自动创建。使用我们自己配置的seata代理的数据源
public class StorageProviderApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(StorageProviderApplication.class, args);
}
}
业务模块
- StorageVo
@TableName("t_storage")
public class StorageVo {
/**
* 主键
*/
private Integer id;
/**
* 产品标识
*/
@TableField("productId")
private Long productId;
/**
* 总库存
*/
private Integer total;
- StorageMapper
package com.bitter.storage.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bitter.storage.domain.StorageVo;
public interface StorageMapper extends BaseMapper<StorageVo>{
}
- IStorageService\StorageService
public interface IStorageService {
public String decrease(Long productId, Integer total);
}
@Service
public class StorageService implements IStorageService {
private static Logger log = LoggerFactory.getLogger(StorageService.class);
@Autowired
StorageMapper storageMapper;
public String decrease(Long productId, Integer total) {
StorageVo storageVo = new StorageVo();
storageVo.setProductId(productId);
int org_total = getTotal(productId);
log.info("########现有的库存数据为:{}########", org_total);
storageVo.setTotal((getTotal(productId) - total));
UpdateWrapper<StorageVo> updateWrapper = new UpdateWrapper<>();
updateWrapper.lambda().eq(StorageVo::getProductId, productId);
log.info("########进入更新库存开始########");
storageMapper.update(storageVo, updateWrapper);
log.info("########进入更新库存结束########");
return "decrease";
}
}
- StorageController
package com.bitter.storage.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.bitter.storage.domain.StorageVo;
import com.bitter.storage.mapper.StorageMapper;
import com.bitter.storage.service.IStorageService;
import feign.Param;
/**
*
* @author thomashe
* */
@RestController
@RequestMapping("/storage")
public class StorageController {
@RequestMapping("/decrease")
public String decrease(@RequestParam("productId") Long productId, @RequestParam("total") Integer total) {
iStorageService.decrease(productId, total);
return "successfully.";
}
}
- StorageProviderApplication
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.FeignClient;
/**
* 库存提供者启动入口
*
* @author thomashe
* @date 2020年7月17日
*/
@FeignClient
@EnableEurekaClient
@MapperScan(basePackages = "com.bitter.storage.mapper")
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class }) // 取消数据源的自动创建。使用我们自己配置的seata代理的数据源
public class StorageProviderApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(StorageProviderApplication.class, args);
}
}
- DataSourceConfiguration
package com.bitter;
import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.type.JdbcType;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.mybatisplus.core.MybatisConfiguration;
import com.baomidou.mybatisplus.core.MybatisXMLLanguageDriver;
import io.seata.rm.datasource.DataSourceProxy;
/**
* 配置代理数据源
* @author thomashe
*
*/
@Configuration
public class DataSourceConfiguration {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
return druidDataSource;
}
@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setTypeAliasesPackage("com.bitter.storage.domain");
MybatisConfiguration configuration = new MybatisConfiguration();
configuration.setDefaultScriptingLanguage(MybatisXMLLanguageDriver.class);
configuration.setJdbcTypeForNull(JdbcType.NULL);
sqlSessionFactoryBean.setConfiguration(configuration);
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
}
order-provider
业务客户端的代码类似,这里对不同之处说下,
- 创建fegin的运程调用类
@FeignClient(value = "STORAGE-SERVER") //库存服务名
public interface StorageFeginApi {
/**
* 更新库存
*
* @param productId
* @param count
* @return
*/
@GetMapping(value = "/storage/decrease")
String decrease(@RequestParam("productId") Long productId, @RequestParam("total") Integer total);
}
- 对要增加分布式事务的service方法增加注解,在
OrderServiceImpl
类的业务操作方法上面增加@GlobalTransactional
的分布式事务的注解,注解上面的参数名需与配置文件一致
@GlobalTransactional(name = "sos-create-order", rollbackFor = Exception.class)
public int create(OrderVo orderVo) {
orderMapper.insert(orderVo); // 创建订单
storageApi.decrease(orderVo.getProduct_id(), orderVo.getTotal());//库存更新
if (orderVo.getTotal() == 1000) {//异常测试
throw new RuntimeException("当参数为1000时,事务回滚,数据无法保存");
}
return 0;
}
运行结果
- 依次启动:
eureka-server/EurekaServerApplication
storage-provider/StorageProviderApplication
order-provider/OrderProviderApplication
- 启动Seata的服务端(注意要先启动eureka才可以启动Seata的服务端,不然运行会出错)
- 用Postman测试
- 测试结果
if (orderVo.getTotal() == 1000) {//异常测试
throw new RuntimeException("当参数为1000时,事务回滚,数据无法保存");
}
数量为1000时会出现异常,数据没有插入成功,其它数量都是可用的。
参考资料
https://github.com/seata/seata-samples/tree/master/springcloud-eureka-feign-mybatis-seata
源码下载地址
https://gitee.com/viphzc/seata
如有问题,欢迎有问题及时交流。
授人以鱼不如授人以渔...