分布式事务Seata源码解析九:分支事务如何注册到全局事务

时间:2022-10-22 13:54:19

一、前言

更多内容见Seata专栏:https://blog.csdn.net/saintmm/category_11953405.html

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
  12. 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
  13. 分布式事务Seata源码解析八:本地事务执行流程(AT模式下)

Seata最核心的全局事务执行流程,上文我们聊了本地事务是如何执行的?在本地事务执行的过程中涉及到分支事务如何注册到全局事务、undo log的构建,本文我们接着聊分支事务如何注册到全局事务。

二、RM中分支事务注册入口

在上一文(分布式事务Seata源码解析八:本地事务执行流程(AT模式下))中,提到ConnectionProxy#processGlobalTransactionCommit()最终处理本地事务的提交。

分布式事务Seata源码解析九:分支事务如何注册到全局事务

其中register()方法向远程的TC中注册分支事务:

private void register() throws TransactionException {
    if (!context.hasUndoLog() || !context.hasLockKey()) {
        return;
    }

    // 分支事务注册:将事务类型AT、资源ID(资源在前面的流程已经注册过了)、事务xid、全局锁keys作为分支事务信息注册到seata server
    Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
    context.setBranchId(branchId);
}

注册分支事务、获取分支事务ID的入口流程如下:

  1. DefaultResourceManager.get() 获取单例形式的资源管理器DefaultResourceManager,通过其注册分支事务;
  2. 再根据分支类型(AT、TCC、XA、SAGA)获取相应类型的ResourceManager;
    分布式事务Seata源码解析九:分支事务如何注册到全局事务

因为存在四种分布式事务的模式(AT、TCC、XA、SAGA),所以此处也正好对应四种ResourceManager:

分布式事务Seata源码解析九:分支事务如何注册到全局事务

这四种ResourceManager都继承了AbstractResourceManager,并且都没有重写AbstractResourceManagerbranchRegister()方法,所以无论是哪种全局事务模式,分支事务注册到全局事务的方式都一样,都体现在AbstractResourceManagerbranchRegister()方法中;而分支事务的提交和回滚方式却各不相同。

@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
    try {
        BranchRegisterRequest request = new BranchRegisterRequest();
        // xid是全局事务ID
        request.setXid(xid);
        // 分布式事务要更新数据的全局锁keys
        request.setLockKey(lockKeys);
        // 分支事务对应的资源ID
        request.setResourceId(resourceId);
        // 分支事务类型
        request.setBranchType(branchType);
        // 引用的数据
        request.setApplicationData(applicationData);

        // 将请求通过RmNettyRemotingClient发送到seata-server
        BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
        }
        return response.getBranchId();
    } catch (TimeoutException toe) {
        throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
    } catch (RuntimeException rex) {
        throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
    }
}

方法中构建一个BranchRegisterRequest,通过netty将请求发送到TC进行分支事务的注册;

三、TC中处理分支事务注册

【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信一文中,我们聊了Seata Client 如何和Seata Server建立连接、通信;

又在【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么一文中,我们知道了TC(Seata Server)启动之后,AbstractNettyRemotingServer的内部类ServerHandler负责接收并处理请求。

分布式事务Seata源码解析九:分支事务如何注册到全局事务
ServerHandler类上有个@ChannelHandler.Sharable注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。

processMessage(ctx, (RpcMessage) msg)方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。

/**
 * Rpc message processing.
 *
 * @param ctx        Channel handler context.
 * @param rpcMessage rpc message.
 * @throws Exception throws exception process message error.
 * @since 1.3.0
 */
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
    }
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        // 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            // 如果消息对应的处理器设置了线程池,则放到线程池中执行
            if (pair.getSecond() != null) {
                try {
                    pair.getSecond().execute(() -> {
                        try {
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally {
                            MDC.clear();
                        }
                    });
                } catch (RejectedExecutionException e) {
                    // 线程池拒绝策略之一,抛出异常:RejectedExecutionException
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                        "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@")[0];
                        long idx = System.currentTimeMillis();
                        try {
                            String jstackFile = idx + ".log";
                            LOGGER.info("jstack command will dump to " + jstackFile);
                            Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                        } catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        allowDumpStack = false;
                    }
                }
            } else {
                // 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
                try {
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                }
            }
        } else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
        }
    } else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    }
}

Seata Serer接收到请求的执行链路为:
分布式事务Seata源码解析九:分支事务如何注册到全局事务

1、BranchRegisterRequest

又由于RM发送开启事务请求时的RPCMessage的body为BranchRegisterRequest,所以进入到:

分布式事务Seata源码解析九:分支事务如何注册到全局事务

又由于在DefaultCoordinator#onRequest()方法中,将DefaultCoordinator自身绑定到了AbstractTransactionRequestToTChandler属性中:

分布式事务Seata源码解析九:分支事务如何注册到全局事务

所以会进入到:

分布式事务Seata源码解析九:分支事务如何注册到全局事务
DefaultCore封装了AT、TCC、Saga、XA分布式事务模式的具体实现类。

2、DefaultCore执行分支事务的注册

DefaultCore#branchRegister()方法中会首先根据分布式事务模式获取到相应的AbstractCore,这里的处理方式和上面获取分布式事务模式对应的ResourceManager的处理方式一样;

分布式事务Seata源码解析九:分支事务如何注册到全局事务

因为存在四种分布式事务的模式(AT、TCC、XA、SAGA),所以此处也正好对应四种AbstractCore:

分布式事务Seata源码解析九:分支事务如何注册到全局事务

这四种Core都继承了AbstractCore,并且都没有重写AbstractResourceManagerbranchRegister()方法,所以无论是哪种全局事务模式,分支事务注册到全局事务的方式都一样,都体现在AbstractCorebranchRegister()方法中;

@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                           String applicationData, String lockKeys) throws TransactionException {
    // 根据xid从DB中找到全局事务会话(会做一个DB查询操作)
    GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
    return SessionHolder.lockAndExecute(globalSession, () -> {
        // 检查全局事务会话的状态
        globalSessionStatusCheck(globalSession);
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // 分支事务会话  根据全局事务开启一个分支事务
        BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                applicationData, lockKeys, clientId);
        // 将branchID放到ThreadLocal中
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
        // 给分支事务加全局锁
        branchSessionLock(globalSession, branchSession);
        try {
            // 将分支事务会话添加到全局事务会话
            globalSession.addBranch(branchSession);
        } catch (RuntimeException ex) {
            branchSessionUnlock(branchSession);
            throw new BranchTransactionException(FailedToAddBranch, String
                    .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
                            branchSession.getBranchId()), ex);
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
                    globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
        }
        return branchSession.getBranchId();
    });
}

方法做的事情如下:

  1. 根据xid从DB / file / redis (由TC的store.mode配置决定)中找到全局事务会话(这里会做一个DB查询操作),并且断言globalSession不许为空;
  2. 校验全局事务会话,全局事务会话必须是存活的,并且状态必须为:GlobalStatus.Begin;
  3. 构建一个分支事务会话BranchSession;给分支事务加全局锁,出现锁冲突则直接报错,抛出异常BranchTransactionException
  4. 将分支事务会话添加到全局事务会话,持久化分支事务会话;

1)获取全局事务会话GlobalSession

分布式事务Seata源码解析九:分支事务如何注册到全局事务
SessionHolder是会话管理者,其中包括四个会话管理器:

// 用于管理所有的Setssion,以及Session的创建、更新、删除等
private static SessionManager ROOT_SESSION_MANAGER;
// 用于管理所有的异步commit的Session,包括创建、更新以及删除
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试commit的Session,包括创建、更新以及删除
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试rollback的Session,包括创建、更新以及删除
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

分布式事务Seata源码解析九:分支事务如何注册到全局事务
用于获取全局事务会话的管理器为:ROOT_SESSION_MANAGER;在初始化SessionHolder时,会根据store.mode对其进行赋值:

分布式事务Seata源码解析九:分支事务如何注册到全局事务

例如博主的TC采用的store.mode是DB,所以找到:DataBaseSessionManager;

分布式事务Seata源码解析九:分支事务如何注册到全局事务

DataBaseSessionManager#findGlobalSession()方法如下:

分布式事务Seata源码解析九:分支事务如何注册到全局事务

注意:在AbstractCore#branchRegister()方法中查询全局事务会话时,withBranchSessions = false,所以不会把分支事务查出来。

LogStore

LogStore是对全局事务、分支事务做DB操作的DAO层;LogStore接口只有一个实现LogStoreDataBaseDAO,其queryGlobalTransactionDO()方法内容很简单,就直接使用JDBC查表:

@Override
public GlobalTransactionDO queryGlobalTransactionDO(String xid) {
    String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getQueryGlobalTransactionSQL(globalTable);
    Connection conn = null;
    PreparedStatement ps = null;
    ResultSet rs = null;
    try {
        conn = logStoreDataSource.getConnection();
        conn.setAutoCommit(true);
        ps = conn.prepareStatement(sql);
        ps.setString(1, xid);
        rs = ps.executeQuery();
        if (rs.next()) {
            return convertGlobalTransactionDO(rs);
        } else {
            return null;
        }
    } catch (SQLException e) {
        throw new DataAccessException(e);
    } finally {
        IOUtil.close(rs, ps, conn);
    }
}

本文后面相似的DB操作不再赘述。

2)校验全局事务会话GlobalSession

全局事务会话必须是存活的,并且状态必须为:GlobalStatus.Begin;

protected void globalSessionStatusCheck(GlobalSession globalSession) throws GlobalTransactionException {
    if (!globalSession.isActive()) {
        throw new GlobalTransactionException(GlobalTransactionNotActive, String.format(
            "Could not register branch into global session xid = %s status = %s, cause by globalSession not active",
            globalSession.getXid(), globalSession.getStatus()));
    }
    if (globalSession.getStatus() != GlobalStatus.Begin) {
        throw new GlobalTransactionException(GlobalTransactionStatusInvalid, String
                .format("Could not register branch into global session xid = %s status = %s while expecting %s",
                        globalSession.getXid(), globalSession.getStatus(), GlobalStatus.Begin));
    }
}

3)分支事务会话BranchSession加全局锁

分布式事务Seata源码解析九:分支事务如何注册到全局事务

1> SessionHolder首先构建一个BranchSession

BranchSession的内容包括:全局事务xid、全局事务id、根据雪花算法生成的分支事务id、全局事务模式、RM资源Id、分支事务要加的全局锁keys、RM客户端ID、RM应用名;

public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId,
        String applicationData, String lockKeys, String clientId) {
    BranchSession branchSession = new BranchSession();

    branchSession.setXid(globalSession.getXid());
    branchSession.setTransactionId(globalSession.getTransactionId());
    branchSession.setBranchId(UUIDGenerator.generateUUID());
    branchSession.setBranchType(branchType);
    branchSession.setResourceId(resourceId);
    branchSession.setLockKey(lockKeys);
    branchSession.setClientId(clientId);
    branchSession.setApplicationData(applicationData);

    return branchSession;
}

2> branchID放入ThreadLocal

MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));

3> *给分支事务加全局锁

分布式事务Seata源码解析九:分支事务如何注册到全局事务

AbstractCore中的branchSessionLock()方法没有具体的实现,并且在Seata中只有AT模式有全局锁的概念,因此只需要看ATCore的branchSessionLock()方法即可;

分布式事务Seata源码解析九:分支事务如何注册到全局事务

默认情况下seata在给分支事务加全局锁的同时,会检查全局锁是否冲突;

分布式事务Seata源码解析九:分支事务如何注册到全局事务

LockStoreDataBaseDAO#acquireLock()获取全局锁

LockStoreDataBaseDAO#acquireLock()方法如下:

@Override
public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {
    Connection conn = null;
    PreparedStatement ps = null;
    ResultSet rs = null;
    // 已经存在的行锁key集合
    Set<String> dbExistedRowKeys = new HashSet<>();
    boolean originalAutoCommit = true;
    if (lockDOs.size() > 1) {
        lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
    }
    try {
        // 从全局锁数据源里获取到一个连接
        conn = lockStoreDataSource.getConnection();
        // 把自动提交事务关闭
        if (originalAutoCommit = conn.getAutoCommit()) {
            conn.setAutoCommit(false);
        }
        List<LockDO> unrepeatedLockDOs = lockDOs;

        //check lock
        // 是否跳过锁检查
        if (!skipCheckLock) {

            boolean canLock = true;
            //query,针对全局锁表查询某个数据
            String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());
            ps = conn.prepareStatement(checkLockSQL);
            for (int i = 0; i < lockDOs.size(); i++) {
                ps.setString(i + 1, lockDOs.get(i).getRowKey());
            }
            rs = ps.executeQuery();
            // 获取到当前要加全局锁的事务xid
            String currentXID = lockDOs.get(0).getXid();
            boolean failFast = false;
            // 查询结果为空时,说明没有事务加全局锁
            while (rs.next()) {
                String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
                // 如果加全局锁的是其他的全局事务xid
                if (!StringUtils.equals(dbXID, currentXID)) {
                    if (LOGGER.isInfoEnabled()) {
                        String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
                        String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
                        long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
                        LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);
                    }
                    if (!autoCommit) {
                        int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);
                        if (status == LockStatus.Rollbacking.getCode()) {
                            failFast = true;
                        }
                    }
                    canLock = false;
                    break;
                }

                dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
            }

            // 不可以加全局锁,全局锁已经被其他事务占用
            if (!canLock) {
                conn.rollback();
                if (failFast) {
                    throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
                }
                return false;
            }
            // If the lock has been exists in db, remove it from the lockDOs
            if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
                unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
                        .collect(Collectors.toList());
            }
            if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
                conn.rollback();
                return true;
            }
        }

        // lock
        if (unrepeatedLockDOs.size() == 1) {
            LockDO lockDO = unrepeatedLockDOs.get(0);
            // 加全局锁
            if (!doAcquireLock(conn, lockDO)) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
                }
                conn.rollback();
                return false;
            }
        } else {
            // 批量加全局锁
            if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
                        unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
                }
                conn.rollback();
                return false;
            }
        }
        conn.commit();
        return true;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(rs, ps);
        if (conn != null) {
            try {
                if (originalAutoCommit) {
                    conn.setAutoCommit(true);
                }
                conn.close();
            } catch (SQLException e) {
            }
        }
    }
}

增加全局行锁、检查全局锁冲突的逻辑如下:

  1. 先对要加的全局行锁去重,然后关闭数据库连接的自动提交;

  2. 如果跳过了全局锁冲突检查,则直接持久化全局行锁,然后提交全局锁数据持久化事务;

  3. 如果需要进行全局锁冲突检查:

    1. 首先根据分支事务传入的全局行锁构建查询全局锁的SQL;
      分布式事务Seata源码解析九:分支事务如何注册到全局事务

      • SQL模板(要上几个行锁,in后面就有几个?)

        select xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified,status from lock_table where row_key in ( ? ) order by status desc 
        
      • 行锁的key值(数据库连接URL + 表明 + 主键id)

        jdbc:mysql://127.0.0.1:3306/seata_stock^^^stock_tbl^^^1
        
    2. 如果根据查询全局行锁SQL没有从DB中查出记录,说明没有其他事务加当前分支事务所需要的全局行锁;

      则直接持久化全局行锁,然后提交全局锁数据持久化事务;

    3. 如果根据查询全局行锁SQL从DB中查出了记录,并且加全局锁的全局事务xid不是当前全局事务的,则说明全局锁已经被其他全局事务占用;

      进而回滚当前提交全局锁数据持久化事务,返回false,表示加全局锁失败;

      方法返回到ATCore#branchSessionLock()方法中,如果加全局锁失败,则直接抛出异常BranchTransactionException

所谓的加全局锁操作,其实就是针对每一行记录 持久化一条行锁记录到lock_table表中:

protected boolean doAcquireLock(Connection conn, LockDO lockDO) {
    PreparedStatement ps = null;
    try {
        //insert
        String insertLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getInsertLockSQL(lockTable);
        ps = conn.prepareStatement(insertLockSQL);
        // 全局事务xid
        ps.setString(1, lockDO.getXid());
        ps.setLong(2, lockDO.getTransactionId());
        // 分支事务ID
        ps.setLong(3, lockDO.getBranchId());
        ps.setString(4, lockDO.getResourceId());
        ps.setString(5, lockDO.getTableName());
        // 主键
        ps.setString(6, lockDO.getPk());
        // rowKey
        ps.setString(7, lockDO.getRowKey());
        // 锁状态:Locked(已加锁)
        ps.setInt(8, LockStatus.Locked.getCode());
        return ps.executeUpdate() > 0;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(ps);
    }
}

4)分支事务添加到全局事务

分布式事务Seata源码解析九:分支事务如何注册到全局事务

1> 持久化分支事务

将分支事务注册到全局事务之后,会触发Session生命周期监听器SessionLifecycleListeneronAddBranch()事件;

// AbstractSessionManager 将 分支事务会话持久化到DB中
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
    lifecycleListener.onAddBranch(this, branchSession);
}

在此处,SessionLifecycleListener只有一个实现AbstractSessionManager:

分布式事务Seata源码解析九:分支事务如何注册到全局事务
DataBaseTransactionStoreManager是store.mode为db时的事务存储管理器,其writeSession()方法负责持久化全局事务、分支事务;

@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
        return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
        return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
        return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
        return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
        return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
        return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else {
        throw new StoreException("Unknown LogOperation:" + logOperation.name());
    }
}

前面也提到过LogStore可以看做是封装了JDBC、操作DB的工具类 / DAO层,其只有一个实现LogStoreDataBaseDAO;

@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
    String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
    Connection conn = null;
    PreparedStatement ps = null;
    try {
        int index = 1;
        conn = logStoreDataSource.getConnection();
        conn.setAutoCommit(true);
        ps = conn.prepareStatement(sql);
        ps.setString(index++, globalTransactionDO.getXid());
        ps.setLong(index++, globalTransactionDO.getTransactionId());
        ps.setInt(index++, globalTransactionDO.getStatus());
        ps.setString(index++, globalTransactionDO.getApplicationId());
        ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
        String transactionName = globalTransactionDO.getTransactionName();
        transactionName = transactionName.length() > transactionNameColumnSize ?
            transactionName.substring(0, transactionNameColumnSize) :
        transactionName;
        ps.setString(index++, transactionName);
        ps.setInt(index++, globalTransactionDO.getTimeout());
        ps.setLong(index++, globalTransactionDO.getBeginTime());
        ps.setString(index++, globalTransactionDO.getApplicationData());
        return ps.executeUpdate() > 0;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(ps, conn);
    }
}

这里只是单纯的利用JDBC对数据做持久化,将数据持久化到branch_table

2> JVM层面分支事务添加到全局事务

分布式事务Seata源码解析九:分支事务如何注册到全局事务

四、总结

无论是AT、TCC、XA、SAGA哪种分布式事务模式,分支事务注册到全局事务的方式都一样;

TC中对RM注册分支事务到全局事务的处理逻辑为:

  1. 首先根据xid从DB中找到全局事务会话(这里会做一个DB查询操作);
  2. 校验全局事务会话,全局事务会话必须是存活的,并且状态必须为:GlobalStatus.Begin;
  3. 构建一个分支事务会话BranchSession;给分支事务加全局锁,出现锁冲突则直接报错,抛出异常BranchTransactionException
  4. 将分支事务会话添加到全局事务会话,持久化分支事务会话;

TC在分支事务注册的同时,会同时增加全局行锁、检查全局锁冲突。