mybatis 使用游标Cursor 流式查询

时间:2025-03-20 22:54:32

mybatis 使用游标Cursor 流式查询

背景:

因为项目需要,Sql查询出来的数据有接近50万条,因才用数据仓库(doris),SQL查询出来的时间只需要4s左右,但是由于全量加载不可行,导致长时间卡顿问题;

解决方案:

才用游标Cursor 流式查询配合 注解@Options(fetchSize = Integer.MIN_VALUE),可以最大限度的降低客户端内存消耗

定义

流式查询 指的是查询成功后不是返回一个集合而是返回一个迭代器,应用每次从迭代器取一条查询结果。流式查询的好处是能够降低内存使用。
如果没有流式查询,我们想要从数据库取 1000 万条记录而又没有足够的内存时,就不得不分页查询,而分页查询效率取决于表设计,如果设计的不好,就无法执行高效的分页查询。因此流式查询是一个数据库访问框架必须具备的功能。

MyBatis 中使用流式查询避免数据量过大导致 OOM ,但在流式查询的过程当中,数据库连接是保持打开状态的,因此要注意的是:

  • 执行一个流式查询后,数据库访问框架就不负责关闭数据库连接了,需要应用在取完数据后自己关闭。
  • 必须先读取(或关闭)结果集中的所有行,然后才能对连接发出任何其他查询,否则将引发异常。

流式查询接口

MyBatis 提供了一个叫 的接口类用于流式查询,这个接口继承了 接口,由此可知:
Cursor 是可关闭的;Cursor 是可遍历的。

除此之外,Cursor 还提供了三个方法:

  • isOpen(): 用于在取数据之前判断 Cursor 对象是否是打开状态。只有当打开时 Cursor 才能取数据;
  • isConsumed(): 用于判断查询结果是否全部取完。
  • getCurrentIndex(): 返回已经获取了多少条数据

使用流式查询,则要保持对产生结果集的语句所引用的表的并发访问,因为其查询会独占连接,所以必须尽快处理

完整代码

Mapper
    @Options(fetchSize = Integer.MIN_VALUE)
    Cursor<NetworkOutputTableVO> getGtv3(@Param("vo") ExportExcelVO vo);
Service
    //流式查询
    @Override
    public void getOneByAsync() throws InterruptedException {
        new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                //根据实际情况要不要加,我这是因为业务需求要切换数据源
                DynamicDataSourceContextHolder.push("doris");

                SqlSession sqlSession = getSqlSessionFactory().openSession();
                log.info("----开启sqlSession");
                try {

                    ExportExcelVO ex = new ExportExcelVO();

                    long startTime = System.currentTimeMillis();

                    //获取到指定mapper
                    NetworkOutputTableMapper mapper = sqlSession.getMapper(NetworkOutputTableMapper.class);

                    //查询数据总量
                    Integer total = mapper.getTotal();

                    Cursor<NetworkOutputTableVO> cursor = mapper.getGtv3(ex);
                    List<NetworkOutputTableVO> list = new ArrayList<>();

                    int i=0;
                    if (cursor != null) {
                        for (NetworkOutputTableVO person : cursor) {
                            if (list.size() < 10000) {
//                            ("----id:{},userName:{}", (), ());
                                list.add(person);
                            } else if (list.size() == 10000) {
                                ++i;
                                log.info("----{}、从cursor取数据达到10000条,开始处理数据", i);
                                log.info("----处理数据中...");
                                Thread.sleep(1000);//休眠1s模拟处理数据需要消耗的时间;
                                log.info("----{}、从cursor中取出的10000条数据已经处理完毕", i);
                                list.clear();
                                list.add(person);
                            }
                            if (total == (cursor.getCurrentIndex() + 1)) {
                                ++i;
                                log.info("----{}、从cursor取数据达到10000条,开始处理数据", i);
                                log.info("----处理数据中...");
                                Thread.sleep(1000);//休眠1s模拟处理数据需要消耗的时间;
                                log.info("----{}、从cursor中取出的10000条数据已经处理完毕", i);
                                list.clear();
                            }
                        }
                        if (cursor.isConsumed()) {
                            log.info("----查询sql匹配中的数据已经消费完毕!");
                        }
                    }
                    sqlSession.commit();
                    log.info("----提交事务");
                    long endTime = System.currentTimeMillis();
                    System.out.println(String.format("streamForEach:%s", endTime - startTime));
                    System.out.println(i);
                    System.out.println(cursor.getCurrentIndex()+1);
                    DynamicDataSourceContextHolder.poll();
                }catch (Exception e){
                    e.printStackTrace();
                    sqlSession.rollback();
                }
                finally {
                    if (sqlSession != null) {
                        //全部数据读取并且做好其他业务操作之后,提交事务并关闭连接;
                        sqlSession.close();
                        log.info("----关闭sqlSession");
                    }
                }
            }
        }).start();

        }

参考地址: /jingzh/p/