mybatis的流式查询,或者一次查询处理千万级数据

时间:2025-03-20 22:52:46

1:第一种方法:xml配置

    <select id="selectFetchSize" fetchSize="100" resultSetType="FORWARD_ONLY" resultType="">
        select * from entity_demo
    </select>

fetchSize:
fetchSize属性用于指定每次从数据库获取的记录数。这个属性可以用于控制查询操作的内存使用和性能。
当设置fetchSize时,MyBatis会根据这个值来调整JDBC的Statement对象的fetchSize属性。如果数据库和JDBC驱动支持,这可以减少网络往返次数,提高性能。
在分页查询或处理大量数据时,合理设置fetchSize可以有效地控制每次从数据库拉取的数据量,防止内存溢出。
在您的例子中,fetchSize="100"意味着每次从数据库获取100条记录。
resultSetType:
resultSetType属性用于定义结果集的滚动方向。MyBatis支持以下几种类型:
FORWARD_ONLY:结果集只能向前滚动,这是默认值,适用于大多数情况。
SCROLL_SENSITIVE:结果集可以向前或向后滚动,并且可以检测到数据库中的数据变化。
SCROLL_INSENSITIVE:结果集可以向前或向后滚动,但不会检测到数据库中的数据变化。
这个属性影响JDBC的Statement对象的resultSetType,它决定了结果集的可滚动性和可更新性。
在您的例子中,resultSetType="FORWARD_ONLY"意味着结果集只能向前滚动,这是最常用的类型,因为它通常提供更好的性能。
使用fetchSize和resultSetType可以帮助优化查询性能和资源使用。在处理大量数据或需要特定结果集行为时,这些属性尤其有用。然而,它们的实际效果还取决于数据库驱动程序和数据库服务器的性能特性。

1-2:第二种方法:Mapper类使用注解配置(注意设置必须没有返回值)

@Select("select * from entity_demo t ${}")
 @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = 100)
 @ResultType(entity_demo.class)
 void selectFetchSize(@Param(Constants.WRAPPER) QueryWrapper<entity_demo> wrapper, ResultHandler<entity_demo> handler);

1-3:EntityDemoMapper(注意设置必须没有返回值)

void selectFetchSize(@Param(Constants.WRAPPER) QueryWrapper<EntityDemo> wrapper, ResultHandler<EntityDemo> handler);

1-4:EntityDemoServiceImpl

    @Override
    public void streamGain() {
        QueryWrapper<EntityDemo> wrapper = new QueryWrapper<EntityDemo>();
        entityDemoMapper.selectFetchSize(wrapper, resultContext -> {
            EntityDemo orgData = resultContext.getResultObject();
            /**循环调用实现业务*/
            System.out.println(Thread.currentThread().getName()+"====================");
        });
    }

IEntityDemoService

    /**
     * 流方式获取数据
     * @return
     */
    List<EntityDemo> getList();

流式查询,对于一般查询是一次性把所有数据查询出来放在集合中,这时候GC回收释放不了这一部分内存,就会是堆内存用尽导致程序OOM。对于mybatis的流式查询, 一边查询一边做业务处理 ,这样用过的数据写入流之后就可以GC回收掉内存空间,使内存得到合理应用, 避免了OOM的发生

2:流式查询 指的是查询成功后不是返回一个集合而是返回一个迭代器,应用每次从迭代器取一条查询结果。流式查询的好处是能够降低内存使用,如果没有流式查询,我们想要从数据库取 1000 万条记录而又没有足够的内存时,就不得不分页查询,而分页查询效率取决于表设计,如果设计的不好,就无法执行高效的分页查询。因此流式查询是一个数据库访问框架必须具备的功能。流式查询的过程当中,数据库连接是保持打开状态的,因此要注意的是:执行一个流式查询后,数据库访问框架就不负责关闭数据库连接了,需要应用在取完数据后自己关闭

但构建 Cursor 的过程不简单,我们举个实际例子。下面是一个 Mapper 类:

@Mapper
public interface CustomerMapper extends BaseMapper<CustomerOne> {
    
    @Select("select * from customer limit #{limit}")
    Cursor<CustomerOne> scan(@Param("limit") int limit);

}

方法 scan() 是一个非常简单的查询。通过指定 Mapper 方法的返回值为 Cursor 类型,MyBatis 就知道这个查询方法一个流式查询

然后我们再写一个 SpringMVC Controller 方法来调用 Mappe

@Resource
private CustomerMapper customerMapper;

@GetMapping("test")
public void scan(int limit) throws Exception {
    try (Cursor<CustomerOne> cursor = customerMapper.scan(limit)) {  // 1
        /**从cursor 中取数据*/
        cursor.forEach(cus-> {
        System.out.println(cus.getName());});                     
    }
}

上面的代码看上去没什么问题,但是执行 scanFoo0() 时会报错:

java.lang.IllegalStateException: A Cursor is already closed.

这是因为我们前面说了在取数据的过程中需要保持数据库连接,而 Mapper 方法通常在执行完后连接就关闭了,因此 Cusor 也一并关闭了。所以,解决这个问题的思路不复杂,保持数据库连接打开即可。我们至少有三种方案可选

方案一:SqlSessionFactory,我们可以用 SqlSessionFactory 来手工打开数据库连接,将 Controller 方法修改如下:

import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;

@Resource
SqlSessionFactory sqlSessionFactory;
 
@GetMapping("test")
public void scan(int limit) throws Exception {
    try (
        /**开启了一个 SqlSession (实际上也代表了一个数据库连接),并保证它最后能关闭*/
        SqlSession sqlSession = sqlSessionFactory.openSession(); 
        /**使用 SqlSession 来获得 Mapper 对象。这样才能保证得到的 Cursor 对象是打开状态的*/
        Cursor<CustomerOne> cursor =
              sqlSession.getMapper(CustomerMapper.class).scan(limit) 
    ) {
       /**从cursor 中取数据*/
        cursor.forEach(cus-> {
        System.out.println(cus.getName());});                     
      }
}

方案二:TransactionTemplate,在 Spring 中,我们可以用 TransactionTemplate 来执行一个数据库事务,这个过程中数据库连接同样是打开的。代码如下:

import org.apache.ibatis.cursor.Cursor;
import org.springframework.transaction.TransactionManager;

@Resource
private CustomerMapper customerMapper;

@Resource
TransactionManager transactionManager;

@GetMapping("test")
public void scan(int limit) throws Exception {
    TransactionTemplate transactionTemplate =
            new TransactionTemplate(transactionManager);
    /**处执行数据库事务,而数据库事务的内容则是调用 Mapper 对象的流式查询*/
    transactionTemplate.execute(status -> {   
        try (Cursor<CustomerOne> cursor = customerMapper.scan(limit)) {
            cursor.forEach(cus-> {
            System.out.println(cus.getName());});  
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    });
}

方案三:@Transactional 注解,这个本质上和方案二一样,代码如下:

@Transactional
@GetMapping("test")
public void scan(int limit) throws Exception {
    try (Cursor<CustomerOne> cursor = customerMapper.scan(limit)) {
        cursor.forEach(cus-> {
        System.out.println(cus.getName());});
    }
}

它仅仅是在原来方法上面加了个 @Transactional 注解。这个方案看上去最简洁,但请注意 Spring 框架当中注解使用的坑:只在外部调用时生效。在当前类中调用这个方法,依旧会报错