mybatis-plus批量操作分析
- 代码一:批量更新 updateBatchById
- 步骤1:基本参数
- 步骤2、更新操作
- 步骤3、批量执行逻辑
- 步骤4、执行提交事务
- 代码二:批量保存或更新 saveOrUpdateBatch
- 方法1:查询逻辑(判断是否存在数据)和更新逻辑
- 方法2:整体判断逻辑
- 结尾
代码一:批量更新 updateBatchById
mybatis-plus的批量更新方法updateBatchById主要有以下步骤。下面我们开始逐步分析,为了方便理解,我会给代码加一些注解:
步骤1:基本参数
我们需要传入两个参数:需要更新的集合 entityList 以及 每次触发预插入的数量batchSize。
public boolean updateBatchById(Collection<T> entityList, int batchSize) {
String sqlStatement = getSqlStatement(SqlMethod.UPDATE_BY_ID); //注意点一
return executeBatch(entityList, batchSize, (sqlSession, entity) -> {
MapperMethod.ParamMap<T> param = new MapperMethod.ParamMap<>();
param.put(Constants.ENTITY, entity);
sqlSession.update(sqlStatement, param);
});
}
以下是两个注意点
获取对应的Mapper方法,debug中可以看出sqlStatement为 ,猜测底层逻辑是反射找到对应的方法然后执行。
String sqlStatement = getSqlStatement(SqlMethod.UPDATE_BY_ID);
在IService中重载了updateBatchById方法并定义了batchSize默认大小为1000。所以我们平时在Service层中调用的是IService的重载方法。如下代码
int DEFAULT_BATCH_SIZE = 1000;
@Transactional(rollbackFor = Exception.class)
default boolean updateBatchById(Collection<T> entityList) {
return updateBatchById(entityList, DEFAULT_BATCH_SIZE);
}
步骤2、更新操作
可以从上面看到一个executeBatch方法的参数中,传入了一个消费者 BiConsumer。该消费者主要作用是获取参数,执行对应方法。可以看作是单条数据的更新操作。
(sqlSession, entity) -> {
MapperMethod.ParamMap<T> param = new MapperMethod.ParamMap<>();
param.put(Constants.ENTITY, entity); //更新方法参数
sqlSession.update(sqlStatement, param);
}
步骤3、批量执行逻辑
点击executeBatch方法,可以看到里面调了方法。所以我们直接来看再来看SqlHelper的executeBatch方法里是什么逻辑。
首先,简单判断了batchSize要 >= 1,以及传入的集合需要不为空。
然后又是一个传入的消费者Consumer参数,执行的操作为:在遍历我们传入的集合,执行上述的步骤 2 中的消费者方法(更新操作)。并且每次循环到一定次数就先预插入,而这个次数就是取的集合的大小和 batchSize 的最小值。
public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list,
int batchSize, BiConsumer<SqlSession, E> consumer) {
Assert.isFalse(batchSize < 1, "batchSize must not be less than one");
//!(list) 如果为false,无论&&后面代码执不执行,整体一定为false
return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, sqlSession -> {
int size = list.size();
int idxLimit = Math.min(batchSize, size); //取最小值,集合全部遍历完 或 达到传入的batchSize
int i = 1;
for (E element : list) {
consumer.accept(sqlSession, element); //步骤 2 更新操作
if (i == idxLimit) {
sqlSession.flushStatements(); //预插入
idxLimit = Math.min(idxLimit + batchSize, size); //重新取值
}
i++;
}
});
}
步骤4、执行提交事务
先是通过实体对应的Class类,获取到对应的工厂类 sqlSessionFactory,这里还判断了工厂中是否还存在未提交的sqlSession,存在则进行提交。使用工厂类创建了sqlSession,执行了消费者(上述步骤2、3),最后调用commit()进行提交。
@SneakyThrows
public static boolean executeBatch(Class<?> entityClass, Log log, Consumer<SqlSession> consumer) {
SqlSessionFactory sqlSessionFactory = sqlSessionFactory(entityClass); //获取工厂
SqlSessionHolder sqlSessionHolder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sqlSessionFactory);
boolean transaction = TransactionSynchronizationManager.isSynchronizationActive();
if (sqlSessionHolder != null) {
SqlSession sqlSession = sqlSessionHolder.getSqlSession();
//原生无法支持执行器切换,当存在批量操作时,会嵌套两个session的,优先commit上一个session
//按道理来说,这里的值应该一直为false。
sqlSession.commit(!transaction);
}
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);
if (!transaction) {
log.warn("SqlSession [" + sqlSession + "] Transaction not enabled");
}
try {
consumer.accept(sqlSession); //执行步骤3操作
//非事务情况下,强制commit。
sqlSession.commit(!transaction); //commit提交
return true;
} catch (Throwable t) {
sqlSession.rollback();
Throwable unwrapped = ExceptionUtil.unwrapThrowable(t);
if (unwrapped instanceof PersistenceException) {
MyBatisExceptionTranslator myBatisExceptionTranslator
= new MyBatisExceptionTranslator(sqlSessionFactory.getConfiguration().getEnvironment().getDataSource(), true);
Throwable throwable = myBatisExceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped);
if (throwable != null) {
throw throwable;
}
}
throw ExceptionUtils.mpe(unwrapped);
} finally {
sqlSession.close();
}
}
代码二:批量保存或更新 saveOrUpdateBatch
批量保存或更新步骤和批量更新步骤类似,这里不再重复讲述,只说下两个方法。
方法1:查询逻辑(判断是否存在数据)和更新逻辑
@Transactional(rollbackFor = Exception.class)
@Override
public boolean saveOrUpdateBatch(Collection<T> entityList, int batchSize) {
TableInfo tableInfo = TableInfoHelper.getTableInfo(entityClass); //获取数据库表名称
Assert.notNull(tableInfo, "error: can not execute. because can not find cache of TableInfo for entity!");
String keyProperty = tableInfo.getKeyProperty(); //表主键
Assert.notEmpty(keyProperty, "error: can not execute. because can not find column for id from entity!");
return SqlHelper.saveOrUpdateBatch(this.entityClass, this.mapperClass, this.log, entityList, batchSize,
(sqlSession, entity) -> {
Object idVal = tableInfo.getPropertyValue(entity, keyProperty);
return StringUtils.checkValNull(idVal)
|| CollectionUtils.isEmpty(sqlSession.selectList(getSqlStatement(SqlMethod.SELECT_BY_ID), entity));
}, (sqlSession, entity) -> {
MapperMethod.ParamMap<T> param = new MapperMethod.ParamMap<>();
param.put(Constants.ENTITY, entity);
sqlSession.update(getSqlStatement(SqlMethod.UPDATE_BY_ID), param);
});
}
获取对应的表名称,没有@TableName就根据驼峰规则来转。
TableInfo tableInfo = (entityClass);
获取表的主键,@TableId标识的属性。
String keyProperty = ();
查询逻辑,检查主键是否为空,然后执行方法,并且调用的Mapper方法是 SqlMethod.SELECT_BY_ID。
(sqlSession, entity) -> {
Object idVal = tableInfo.getPropertyValue(entity, keyProperty); //对应主键属性的值
return StringUtils.checkValNull(idVal)
|| CollectionUtils.isEmpty(sqlSession.selectList(getSqlStatement(SqlMethod.SELECT_BY_ID), entity));
//判断对应select结果是否为空
}
更新逻辑,如果上述的查询逻辑中返回 false,则执行更新逻辑。
(sqlSession, entity) -> {
MapperMethod.ParamMap<T> param = new MapperMethod.ParamMap<>();
param.put(Constants.ENTITY, entity);
sqlSession.update(getSqlStatement(SqlMethod.UPDATE_BY_ID), param);
}
方法2:整体判断逻辑
参数predicate对应的就是上述的查询逻辑,consumer对应的就是更新逻辑。如果查询逻辑执行结果为true,则执行insert方法。后续executeBatch同updateBatchById。
/**
* entityClass 实体类
* mapper Mapper类
* predicate 查询操作
* consumer 更新操作
*/
public static <E> boolean saveOrUpdateBatch(Class<?> entityClass, Class<?> mapper, Log log, Collection<E> list, int batchSize,
BiPredicate<SqlSession, E> predicate, BiConsumer<SqlSession, E> consumer) {
String sqlStatement = getSqlStatement(mapper, SqlMethod.INSERT_ONE);
return executeBatch(entityClass, log, list, batchSize, (sqlSession, entity) -> {
if (predicate.test(sqlSession, entity)) {
sqlSession.insert(sqlStatement, entity);
} else {
consumer.accept(sqlSession, entity);
}
});
}
结尾
我们知道mybatis-plus提供的批量操作只支持了主键更新,没法通过自己自定义的参数条件来更新。如果你看懂了上述讲的Mybatis-Plus的代码,那么写一个通过自己自定义参数的批量更新操作和批量保存操作还是很简单的。下篇博客我将手动写一个能够自定义更新的批量更新操作和自定义保存或更新的批量操作,以及讲解代码中的细节和自己遇到的问题。
如果你看了本文有所收获,请给我点赞。