前言
最近在写爬虫的时候,需要定时的将数据爬取然后导入到数据库中(数据量有点大哦),我最开始的写法是这样的,每爬取到一条数据就立即将数据入库,IO了好多次,这样在无形之中给数据库施压了,唉我这个猪队友…不是还有一个叫做批处理的东西存在嘛!!!于是我用批处理的技术优化了一下代码,顺便研究了一波批处理流程的源码,这也是我写下本文的原因嘻嘻,下文将会按照如下流程来介绍批处理(包含了一丢丢源码分析)。SQL层面->JDBC批处理->mybatis批处理->mybatisPlus批处理。读者可以根据自己的意愿自行读取相应部分
SQL层面来实现批处理
把要插入的数据一条条的进行SQL拼接,拼接完成后,执行一次最终的那条SQL就可以将批量的数据导入到数据库中,如果我们不嫌麻烦在开发我们的项目的时候,利用SQL拼接的思想,亦可以轻松的实现批处理,其实 mybatis 也是用的这个思想
insert into user(name,age) values ("zzh",18) ,("zzh",21)......;
JDBC的批处理技术
如果用JDBC来实现批处理的话,大致的几个步骤如下。下面的代码演示了,向数据库中插入俩条name=zzh,age=21的数据
Class.forName(driver);
Connection conn = DriverManager.getConnection(url, username, password);
conn.setAutoCommit(false);
String insert = "insert into user(name,age) values(?,?)";
PreparedStatement ps = conn.prepareStatement(insert);
ps.setObject(1, "zzh");
ps.setObject(2, "21");
//每一次的addBatch操作就相当于SQL拼接
ps.addBatch();
ps.addBatch();
//执行批处理
ps.executeBatch();
conn.commit();
mybatis的批处理技术(重头戏)
为了向读者叙述的更加全面,打算以 Executor 接口为起点开始叙述,直到让读者体会到 Mapper 是如何实现批处理的。其实我们在调用各种 Mapper 接口进行CRUD 的时候,底层真正干活却是 Executor 接口。如果读者对 mybatis 的源码感兴趣,可以阅读 深入mybatis源码解读~手把手带你debug分析源码 本文不在过多叙述 Executor 的作用。
说了这么多,mybatis是如何执行批处理的呢?简单的一个例子带大家来体验一下,下面我把如下配置注释掉,不使用通过配置文件来配置数据源的方式,而是采用最原始的手动注入的方式来写测试
测试代码
private Configuration configuration;
private JdbcTransaction jdbcTransaction;
private Connection connection;
private Reader resourceAsReader;
private SqlSessionFactory sqlSessionFactory;
@SneakyThrows
public static void main(String[] args) {
mybatisBatch mybatisBatch = new mybatisBatch();
mybatisBatch.init();
mybatisBatch.batchInsert();
}
@SneakyThrows
public void init() {
connection = DriverManager.getConnection(
"url",
"userName",
"password");
resourceAsReader = Resources.getResourceAsReader("mybatis.xml");
jdbcTransaction = new JdbcTransaction(connection);
sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsReader);
configuration = sqlSessionFactory.getConfiguration();
}
@SneakyThrows
public void batchInsert() {
BatchExecutor batchExecutor = new BatchExecutor(this.configuration, jdbcTransaction);
Zhengshangsuo alibaba = new Zhengshangsuo().setContract("alibaba").setContractId("alibaba").setType("alibaba");
Zhengshangsuo nio = new Zhengshangsuo().setContract("nio").setContractId("nio").setType("nio");
//指定方法
MappedStatement insertAlibaba = this.configuration.getMappedStatement("com.zzh.reptile.mapper.ZhengshangsuoMapper.insertAlibaba");
MappedStatement insertNio = this.configuration.getMappedStatement("com.zzh.reptile.mapper.ZhengshangsuoMapper.insertNio");
//关闭事务自动提交
this.connection.setAutoCommit(false);
//添加到批处理
batchExecutor.doUpdate(insertAlibaba, alibaba);
batchExecutor.doUpdate(insertAlibaba, alibaba);
batchExecutor.doUpdate(insertNio, nio);
batchExecutor.doUpdate(insertNio, nio);
//执行批处理逻辑,并且返回执行的结果
List<BatchResult> batchResults = batchExecutor.doFlushStatements(false);
//提交当前事务
this.connection.commit();
}
运行上面的代码,可以看的到插入四条数据,实际上对数据库只有俩次io。这是为什么呢?
其实不管是 Mybatis 也好MybatisPlus 也罢,当我们去调用 下图的这些 api 方法的时候,debug 追踪源码最终都是 各种执行器在起作用。
debug追踪一波源码
不管我们进行多少次 doUpdate()操作,debug发现永远都只会占用数据库的一个连接,其原因就在于
每次传入的 MappedStatement 都是相同的、 Mapper 上的 sql 也是一样的。
这行代码体现了批处理生效的条件,会将 statement、sql 相同的这些操作归类成同一批做批处理
sql.equals(this.currentSql) && ms.equals(this.currentStatement)
这行代码体现了,Sql参数拼接的过程,对此感兴趣的读者可以一路debug下去,就会来到下图的这个地方。我之前看mybatis源码的时候好像看过这一段…就不做详细说明了!
handler.parameterize(stmt);
小结 doUpdate()
所谓的批处理我个人一句话总结概括就是:单个 MappedStatement 的996,开创了一个部门的辉煌帝国。批处理:在一次 connection 中,反复对同一个 MappedStatement 进行sql拼接,并最终运行。从而达到一次处理批量执行的目的。
批处理的运行时机
还记得测试代码中的这行代码吗?debug进去就完事了。
batchExecutor.doFlushStatements(false);
源码很简单,关注下图标注的地方就ok了,遍历 statementList 中所有的 statement 然后执行就完事了。可以看到 mybatis 还是很贴心的,居然把执行过程中的一些数据返回给了我们。这些数据也就是批处理执行的返回结果
顺嘴提一波:所有的 statement 这几个字有俩层含义
- 经过参数拼接的 statement(批处理)
- 普通的 statement (处理单条数据)
批处理更新、查询生效吗?
上文已经研究了一下批处理插入的原理,虽然有多条数据都进行入库,但是可以做到对数据库只进行一次io的效果。反问一波批量更新数据是否也能做到只对数据库进行一次io操作呢?答案是可以的。那再反问批处理查询也可以做到只io一次数据库吗?答案是不可以的。原因也很简单批处理执行器(batchExcutor)的批处理功能只针对 doUpdate 操作生效,而 doUpdate 包括了插入、更新。
分析一波Mybatis-Plus的批处理
这里记得配置一下 application.yml中的数据源,我这里是在boot工程测试类中跑的测试、默认从配置文件中进行读取数据源
@Autowired
private ZhengshangsuoServiceImpl zhengshangsuoService;
@Test
public void mybatisPlusBatch() {
Zhengshangsuo data = new Zhengshangsuo().setContract("1").setContractId("1").setType("1");
ArrayList<Zhengshangsuo> list = new ArrayList<>();
list.add(data);
list.add(data);
zhengshangsuoService.saveBatch(list, 2);
}
debug一波源码,发现还是我们上文分析过的那些逻辑,兜兜转转又回到了 batchExecutor 中的doUpdate 方法。debug过程中会涉及到一点 aop的源码、spring 事务的源码。可以去看下这俩篇文章
- 全面解读spring注解事务失效场景,伪代码+图文深度解析spring源码运行过程
- spring 源码解析(配图文讲解)顺带搞懂了循环依赖、aop底层实现
附页
本文测试代码demo如下哦
package com.zzh.reptile;
import com.zzh.reptile.entity.Zhengshangsuo;
import lombok.SneakyThrows;
import org.apache.ibatis.executor.BatchExecutor;
import org.apache.ibatis.executor.BatchResult;
import org.apache.ibatis.executor.SimpleExecutor;
import org.apache.ibatis.io.Resources;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import org.apache.ibatis.transaction.jdbc.JdbcTransaction;
import java.io.Reader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.HashMap;
import java.util.List;
public class mybatisBatch {
private Configuration configuration;
private JdbcTransaction jdbcTransaction;
private Connection connection;
private Reader resourceAsReader;
private SqlSessionFactory sqlSessionFactory;
@SneakyThrows
public static void main(String[] args) {
mybatisBatch mybatisBatch = new mybatisBatch();
mybatisBatch.init();
mybatisBatch.batchInsert();
}
@SneakyThrows
public void init() {
connection = DriverManager.getConnection(
"jdbc:mysql://127.0.0.1:666/aaa?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT&useSSL=false",
"aaa",
"aaa");
resourceAsReader = Resources.getResourceAsReader("mybatis.xml");
jdbcTransaction = new JdbcTransaction(connection);
sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsReader);
configuration = sqlSessionFactory.getConfiguration();
}
@SneakyThrows
public void batchInsert() {
BatchExecutor batchExecutor = new BatchExecutor(this.configuration, jdbcTransaction);
Zhengshangsuo alibaba = new Zhengshangsuo().setContract("alibaba").setContractId("alibaba").setType("alibaba");
Zhengshangsuo nio = new Zhengshangsuo().setContract("nio").setContractId("nio").setType("nio");
//指定方法
MappedStatement insertAlibaba = this.configuration.getMappedStatement("com.zzh.reptile.mapper.ZhengshangsuoMapper.insertAlibaba");
MappedStatement insertNio = this.configuration.getMappedStatement("com.zzh.reptile.mapper.ZhengshangsuoMapper.insertNio");
//关闭事务自动提交
this.connection.setAutoCommit(false);
//添加到批处理
batchExecutor.doUpdate(insertAlibaba, alibaba);
batchExecutor.doUpdate(insertAlibaba, alibaba);
batchExecutor.doUpdate(insertNio, nio);
batchExecutor.doUpdate(insertNio, nio);
//执行批处理逻辑,并且返回执行的结果
List<BatchResult> batchResults = batchExecutor.doFlushStatements(false);
//提交当前事务
this.connection.commit();
}
@SneakyThrows
public void batchUpdate() {
BatchExecutor batchExecutor = new BatchExecutor(this.configuration, jdbcTransaction);
Zhengshangsuo data1 = new Zhengshangsuo().setContract("nio").setId(18513);
Zhengshangsuo data2 = new Zhengshangsuo().setContract("pdd").setId(18512);
MappedStatement update = this.configuration.getMappedStatement("com.zzh.reptile.mapper.ZhengshangsuoMapper.updatezzh");
this.connection.setAutoCommit(false);
batchExecutor.doUpdate(update, data1);
batchExecutor.doUpdate(update, data2);
batchExecutor.doFlushStatements(false);
this.connection.commit();
}
public void batchQuery() throws Exception {
HashMap<String, Object> queryMap = new HashMap<>();
queryMap.put("contract", "nio");
queryMap.put("id", 18513);
BatchExecutor batchExecutor = new BatchExecutor(this.configuration, jdbcTransaction);
MappedStatement query = this.configuration.getMappedStatement("com.zzh.reptile.mapper.ZhengshangsuoMapper.queryzzh");
this.connection.setAutoCommit(false);
List<Object> res = batchExecutor.doQuery(query, queryMap, RowBounds.DEFAULT, SimpleExecutor.NO_RESULT_HANDLER, query.getBoundSql(1));
List<Object> ress = batchExecutor.doQuery(query, queryMap, RowBounds.DEFAULT, SimpleExecutor.NO_RESULT_HANDLER, query.getBoundSql(1));
batchExecutor.doFlushStatements(false);
this.connection.commit();
System.err.println(res.toString());
System.err.println(ress);
}
}
public interface ZhengshangsuoMapper extends BaseMapper<Zhengshangsuo> {
@Insert("insert into zhengshangsuo (contract,contractId,type) values(#{contract},#{contractId},#{type})")
public int insertAlibaba(Zhengshangsuo zhengshangsuo);
@Insert("insert into zhengshangsuo (contract,contractId,type) values(#{contract},#{contractId},#{type})")
public int insertNio(Zhengshangsuo zhengshangsuo);
@Insert("insert into zhengshangsuo (contract,contractId,type) values(#{contract},#{contractId},#{type})")
public int insertzzh(Zhengshangsuo zhengshangsuo);
@Update("update zhengshangsuo set contract = #{contract} where id = #{id}")
public int updatezzh(Map<String, Object> map);
@Select("select * from zhengshangsuo where contract = #{contract} and id = #{id}")
public List<Zhengshangsuo> queryzzh(Map<String, Object> map);
}
@Autowired
private ZhengshangsuoServiceImpl zhengshangsuoService;
@Test
public void mybatisPlusBatch() {
Zhengshangsuo data = new Zhengshangsuo().setContract("1").setContractId("1").setType("1");
ArrayList<Zhengshangsuo> list = new ArrayList<>();
list.add(data);
list.add(data);
zhengshangsuoService.saveBatch(list, 2);
}