今天要讨论的是“Java实现多线程单条数据事务管理”,在此之前,顺便回顾一下实现多线程的几种方式
实现多线程的三种方式
一、继承Thread类
第一种方法是继承Thread类,重写run()方法
1
2
3
4
5
|
public class TestThread extends Thread {
public void run() {
System.out.println( "继承Thread类,重写run方法" );
}
}
|
使用时,new一个实例,执行start()方法
1
2
3
4
|
TestThread testThread1 = new TestThread(); // 新建状态
TestThread testThread2 = new TestThread(); // 新建状态
testThread1.start(); // 就绪状态
testThread2.start(); // 就绪状态
|
何时执行取决于cpu调度
二、实现Runnable接口
因为Java“单继承、多实现”的特性,当我们已经继承了一个类的时候,则无法再继承Thread类,此时可以通过实现Runnable接口的方式,实现run()方法
1
2
3
4
5
|
public class TestThread extends FatherClass implements Runnable {
public void run() {
System.out.println( "实现Runnable接口的方式,实现run方法" );
}
}
|
Thread类也是实现Runnable接口
使用时,需要首先实例化一个Thread,并传入自己的TestThread实例
1
2
3
|
TestThread testThread = new TestThread();
Thread thread = new Thread(testThread);
thread.start();
|
三、实现Callable和Future接口
该方法区别于前两种的特点是:能够获得线程处理的结果。因此该方式适用于需要对线程的结果进行处理的场景
1
2
3
4
5
6
7
8
9
10
11
12
|
class TestCallable implements Callable<Integer> {
@Override
public Integer call() {
int sum = 0 ;
for ( int i = 0 ; i < 100 ; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
sum += i;
}
return sum;
}
}
|
使用时,先创建TestCallable对象,然后使用FutureTask来包装MyCallable对象,再将FutureTask对象作为Thread对象的target创建新的线程,最后thread执行start()方法,线程进入就绪状态
1
2
3
4
|
Callable<Integer> testCallable = new TestCallable(); // 创建TestCallable对象
FutureTask<Integer> futureTask = new FutureTask<Integer>(testCallable); // 使用FutureTask来包装MyCallable对象
Thread thread = new Thread(futureTask); // FutureTask对象作为Thread对象的target创建新的线程
thread.start();
|
多线程单条数据事务管理
我们有时会遇到这样的场景:要对大批量的数据进行更新或插入操作,需要开启多线程来提高效率,又希望每个线程在的处理一批数据时,能够对其中每条数据进行处理的时,做到出错时实现单条数据回滚,而不是所有数回滚(所有数据回滚后续讨论)。先看代码:
根据以上多线程知识,我们先定义一个业务线程类如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public class TestTranstionalThread extends Thread {
private List<BalBankDictEntity> balBankDictEntities;
public TestTranstionalThread( List<BalBankDictEntity> balBankDictEntities){
this .balBankDictEntities = balBankDictEntities;
}
@Override
public void run() {
log.info( "线程{}开始" ,Thread.currentThread().getName());
for (BalBankDictEntity balBankDictEntity : balBankDictEntities) {
try {
collBillDao.insOneBank(balBankDictEntity);
} catch (BusiException e){
log.error( "{}回滚" ,balBankDictEntity.getBankId());
}
}
log.info( "线程{}结束" ,Thread.currentThread().getName());
}
}
|
insOneBank()方法如下,注意的@Transactional注解的事务隔离等级为:REQUIRES_NEW,创建一个新的事务。
1
2
3
4
5
6
7
8
9
10
|
@Transactional (propagation = Propagation.REQUIRES_NEW)
public void insOneBank(BalBankDictEntity balBankDictEntity){
balBankDictMapper.insert(balBankDictEntity);
/* 模拟发生异常,抛出异常,实现将已插入数据回滚 */
if (Integer.parseInt(balBankDictEntity.getBankId().substring( 2 )) % 100 == 0 ){
throw new BusiException( "test" );
}
}
|
开启多线程进行业务处理,注意加上@Transactional注解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
@Transactional
public void testTransactional(){
/* 模拟测试数据 */
List<BalBankDictEntity> balBankDictEntities = new ArrayList<>();
for (int i = 0 ; i < 100000 ; i ++){
BalBankDictEntity balBankDictEntity = new BalBankDictEntity();
balBankDictEntity.setBankCode("BK" + i);
balBankDictEntity.setBankId("ID" + i + "");
balBankDictEntity.setBankName("N" + i + "N");
balBankDictEntities.add(balBankDictEntity);
}
int totalNum = balBankDictEntities.size();
log.info("totalNum" + totalNum);
/* 分10个线程处理 */
ExecutorService fixedThreadPool = Executors.newFixedThreadPool( 10 );
int dealNum = totalNum % 10 == 0 ? totalNum / 10 : totalNum / 10 + 1 ; // 计算每个线程处理的数量
for ( int i = 1 ; i <= 10 ; i++ ){
List<BalBankDictEntity> balBankDictEntityList = splitDataList(balBankDictEntities,dealNum, 10 ,i); // 切割数据集实现数据隔离
TestTranstionalThread testTranstional = new TestTranstionalThread(balBankDictEntityList);
fixedThreadPool.execute(testTranstional);
}
}
|
最终实现多个线程并发插入数据,有异常的数据的单独回滚,不影响整体
到此这篇关于Java多线程事务管理的实现的文章就介绍到这了,更多相关Java多线程事务管理内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://www.cnblogs.com/acelin/p/15003247.html