mybatis 使用游标Cursor 流式查询
背景:
因为项目需要,Sql查询出来的数据有接近50万条,因才用数据仓库(doris),SQL查询出来的时间只需要4s左右,但是由于全量加载不可行,导致长时间卡顿问题;
解决方案:
才用游标Cursor 流式查询配合 注解@Options(fetchSize = Integer.MIN_VALUE),可以最大限度的降低客户端内存消耗
定义
流式查询
指的是查询成功后不是返回一个集合而是返回一个迭代器,应用每次从迭代器取一条查询结果。流式查询的好处是能够降低内存使用。
如果没有流式查询,我们想要从数据库取 1000 万条
记录而又没有足够的内存时,就不得不分页查询,而分页查询效率取决于表设计,如果设计的不好,就无法执行高效的分页查询。因此流式查询是一个数据库访问框架必须具备的功能。
MyBatis
中使用流式查询避免数据量过大导致 OOM
,但在流式查询的过程当中,数据库连接是保持打开状态的,因此要注意的是:
- 执行一个流式查询后,数据库访问框架就不负责关闭数据库连接了,需要应用在取完数据后自己关闭。
- 必须先读取(或关闭)结果集中的所有行,然后才能对连接发出任何其他查询,否则将引发异常。
流式查询接口
MyBatis
提供了一个叫 的接口类用于流式查询,这个接口继承了
和
接口,由此可知:
Cursor
是可关闭的;Cursor
是可遍历的。
除此之外,Cursor
还提供了三个方法:
-
isOpen()
: 用于在取数据之前判断Cursor
对象是否是打开状态。只有当打开时 Cursor 才能取数据; -
isConsumed()
: 用于判断查询结果是否全部取完。 -
getCurrentIndex()
: 返回已经获取了多少条数据
使用流式查询,则要保持对产生结果集的语句所引用的表的并发访问,因为其查询会独占连接,所以必须尽快处理
完整代码
Mapper
@Options(fetchSize = Integer.MIN_VALUE)
Cursor<NetworkOutputTableVO> getGtv3(@Param("vo") ExportExcelVO vo);
Service
//流式查询
@Override
public void getOneByAsync() throws InterruptedException {
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
//根据实际情况要不要加,我这是因为业务需求要切换数据源
DynamicDataSourceContextHolder.push("doris");
SqlSession sqlSession = getSqlSessionFactory().openSession();
log.info("----开启sqlSession");
try {
ExportExcelVO ex = new ExportExcelVO();
long startTime = System.currentTimeMillis();
//获取到指定mapper
NetworkOutputTableMapper mapper = sqlSession.getMapper(NetworkOutputTableMapper.class);
//查询数据总量
Integer total = mapper.getTotal();
Cursor<NetworkOutputTableVO> cursor = mapper.getGtv3(ex);
List<NetworkOutputTableVO> list = new ArrayList<>();
int i=0;
if (cursor != null) {
for (NetworkOutputTableVO person : cursor) {
if (list.size() < 10000) {
// ("----id:{},userName:{}", (), ());
list.add(person);
} else if (list.size() == 10000) {
++i;
log.info("----{}、从cursor取数据达到10000条,开始处理数据", i);
log.info("----处理数据中...");
Thread.sleep(1000);//休眠1s模拟处理数据需要消耗的时间;
log.info("----{}、从cursor中取出的10000条数据已经处理完毕", i);
list.clear();
list.add(person);
}
if (total == (cursor.getCurrentIndex() + 1)) {
++i;
log.info("----{}、从cursor取数据达到10000条,开始处理数据", i);
log.info("----处理数据中...");
Thread.sleep(1000);//休眠1s模拟处理数据需要消耗的时间;
log.info("----{}、从cursor中取出的10000条数据已经处理完毕", i);
list.clear();
}
}
if (cursor.isConsumed()) {
log.info("----查询sql匹配中的数据已经消费完毕!");
}
}
sqlSession.commit();
log.info("----提交事务");
long endTime = System.currentTimeMillis();
System.out.println(String.format("streamForEach:%s", endTime - startTime));
System.out.println(i);
System.out.println(cursor.getCurrentIndex()+1);
DynamicDataSourceContextHolder.poll();
}catch (Exception e){
e.printStackTrace();
sqlSession.rollback();
}
finally {
if (sqlSession != null) {
//全部数据读取并且做好其他业务操作之后,提交事务并关闭连接;
sqlSession.close();
log.info("----关闭sqlSession");
}
}
}
}).start();
}
参考地址: /jingzh/p/