JTA 使用 MySQL 分布式事务

时间:2022-10-06 03:49:17

假定在MySQL实例1上有表

create table person(
id int,
name varchar(32)
)

MySQL实例2上也有一张同样的表,现在从实例1中的 person 表中删除一条数据,并把这条数据插入到实例2的表中,这两个操作在同一个事务中,因为跨越了数据库实例,涉及到了分布式事务。

MySQL实现了分布式事务,查看数据库是否启用了 XA 事务:

show variables like 'innodb_support_xa';

MySQL 关于xa的命令:

xa start 'a';
sql 语句;
xa end 'a';
xa prepare 'a';
xa commit 'a';

与正常事务相比,XA 命令多了 prepare,询问是否准备好,事务管理器根据 prepare 返回的结果进行操作。

以上命令是分布式事务的操作方法,在一个命令行中输入上述命令,并不是真实的分布式事务。可以使用 JTA 来控制MySQL的 XA:

public class JTA_MySQL {

    public static void main(String[] args) {
XADataSource xaDs1 = JTA_MySQL.getDataSource(
"jdbc:mysql://172.30.60.126:3306/db_zhang", "root",
"root");
XAConnection xaCon1 = null;
XAResource xaRes1 = null;
Connection conn1 = null;
Statement stmt1 = null; XADataSource xaDs2 = JTA_MySQL.getDataSource(
"jdbc:mysql://172.30.60.124:3306/db_zhang", "root",
"root");
XAConnection xaCon2 = null;
XAResource xaRes2 = null;
Connection conn2 = null;
Statement stmt2 = null; int ret1 = 0;
int ret2 = 0; Xid xid1 = new MyXid(100, new byte[] { 0x01 }, new byte[] { 0x02 });
Xid xid2 = new MyXid(100, new byte[] { 0x01 }, new byte[] { 0x03 });
try {
xaCon1 = getXAConnetion(xaDs1);
conn1 = getConnection(xaCon1);
stmt1 = conn1.createStatement();
xaRes1 = xaCon1.getXAResource(); xaCon2 = getXAConnetion(xaDs2);
conn2 = getConnection(xaCon2);
stmt2 = conn2.createStatement();
xaRes2 = xaCon2.getXAResource(); xaRes1.start(xid1, XAResource.TMNOFLAGS);
stmt1.execute("delete from person where id=1");
xaRes1.end(xid1, XAResource.TMSUCCESS); xaRes2.start(xid2, XAResource.TMNOFLAGS);
stmt2.execute("insert into person select 1, 'zhang'");
xaRes2.end(xid2, XAResource.TMSUCCESS); ret1 = xaRes1.prepare(xid1);
ret2 = xaRes2.prepare(xid2); if (XAResource.XA_OK == ret1 && XAResource.XA_OK == ret2) {
xaRes1.commit(xid1, false);
xaRes2.commit(xid2, false);
System.out.println("提交分布式事务");
} else {
xaRes1.rollback(xid1);
xaRes2.rollback(xid2);
System.out.println("回退分布式事务");
}
} catch (SQLException e) {
e.printStackTrace();
} catch (XAException e) {
e.printStackTrace();
}
} private static XADataSource getDataSource(String url, String user,
String password) {
MysqlXADataSource dataSource = new MysqlXADataSource();
dataSource.setUrl(url);
dataSource.setUser(user);
dataSource.setPassword(password);
return dataSource;
} public static XAConnection getXAConnetion(XADataSource dataSource) {
XAConnection XAConn = null;
try {
XAConn = dataSource.getXAConnection();
} catch (SQLException e) {
e.printStackTrace();
}
return XAConn;
} public static Connection getConnection(XAConnection XAConn) {
Connection conn = null;
try {
conn = XAConn.getConnection();
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
} public static void closeConnection(Connection conn) {
try {
conn.close();
} catch (SQLException e) {
System.out.println("连接关闭失败");
}
}
}

MyXid 类:

public class MyXid implements Xid {
private int formatId;
private byte[] globalTid;
private byte[] branchQ; public MyXid(int formatId, byte[] globalTid, byte[] branchQ) {
this.formatId = formatId;
this.globalTid = globalTid;
this.branchQ = branchQ;
} public byte[] getBranchQualifier() {
return this.branchQ;
} public int getFormatId() {
return formatId;
} public byte[] getGlobalTransactionId() {
return this.globalTid;
}
}