【项目实战】并发编程之AtomicInteger入门介绍

时间:2025-03-27 11:53:55
/** * 分片导出数据到Excel,避免一次性数据过大,产生OOM */ public class ExportUtil<X> { private Log logger = LogFactory.get(); private JpaSpecificationExecutor<X> jpaSpecificationExecutor; private Specification<X> spec; private EntityManager em; private String sql; private String sheetName; Map<String, Object> sqlParams; private final static int BATCH_SIZE = 2000; private final static int SHEET_SIZE = 50000; private final AtomicInteger currentLine = new AtomicInteger(0); private final ExecutorService executorService = Executors.newCachedThreadPool(); private final static int defaultThreadNum = 2; private final static int maxThreadNum = 4; // Excel自动适应宽度比较耗时,因此在数据量大于这个阀值时,不保证所有的单元格都自动适应大小 private final static int autoSizeThresholdNum = SHEET_SIZE; // 每个线程处理的记录数 private final static int perThreadRow = 10000; /** * 通过JPA的方式将查询到的结果导出到Excel * * @param jpaSpecificationExecutor * @param spec */ public ExportUtil(@NotNull JpaSpecificationExecutor<X> jpaSpecificationExecutor, @NotNull Specification<X> spec) { this.jpaSpecificationExecutor = jpaSpecificationExecutor; this.spec = spec; } /** * 通过写SQL语句的方式将结果导出到Excel * * @param em EntityManager * @param sql sql语句必须匹配:"SELECT .* FROM .*",且不能以limit子句结尾 * @deprecated 建议使用JPA的方式导出数据 */ @Deprecated public ExportUtil(@NotNull EntityManager em, @NotNull String sql) { this.em = em; this.sql = sql; } /** * 通过写SQL语句的方式将结果导出到Excel * * @param em EntityManager * @param sql sql语句必须匹配:"SELECT .* FROM .*",且不能以limit子句结尾 * @param sqlParams sql的参数 * @deprecated 建议使用JPA的方式导出数据 */ @Deprecated public ExportUtil(@NotNull EntityManager em, @NotNull String sql, Map<String, Object> sqlParams) { this(em, sql); this.sqlParams = sqlParams; } public String export(@NotNull String sheetName, @NotNull Function<? super X, Map<String, Object>> process) throws IOException { this.sheetName = sheetName; return this.export(process); } public String export(@NotNull String sheetName) throws IOException { this.sheetName = sheetName; return this.export(this::process); } private Map<String, Object> process(X x) { Map<String, Object> row = new LinkedHashMap<>(); Field[] fields = ReflectUtil.getFields(x.getClass()); for (Field field : fields) { ExcelProperty excelProperty = field.getAnnotation(ExcelProperty.class); if (excelProperty == null) { continue; } String columnName = excelProperty.columnName(); String value = ReflectUtil.invoke(x, StrUtil.upperFirstAndAddPre(field.getName(), "get")); row.put(columnName, value); } return row; } /** * 从数据库到Excel导出流程 * * @param fristRowName * @param response * @param process * @throws IOException */ public String export(@NotNull Function<? super X, Map<String, Object>> process) throws IOException { String tempFile = genTempFile(); ExcelWriter writer = ExcelUtil.getBigWriter(tempFile, sheetName); long count = Optional.ofNullable(jpaSpecificationExecutor).map(x -> x.count(spec)).orElseGet(() -> new SqlQueryData(0, 1).count()); int threadNum = Math.min(maxThreadNum, Math.max((int) count / perThreadRow, defaultThreadNum)); Set<Future<?>> submitSet = new HashSet<>(threadNum); for (int i = 0; i < count; i += BATCH_SIZE) { waiteSubmit(submitSet, threadNum - 1);// 保证至少有一个空闲线程才向下执行 QueryData queryData; if (Objects.nonNull(jpaSpecificationExecutor)) { queryData = new JpaQueryData(i, BATCH_SIZE); } else { queryData = new SqlQueryData(i, BATCH_SIZE); } Future<?> submit = executorService.submit(new DB2Excel(queryData, writer, process)); submitSet.add(submit); } waiteSubmit(submitSet, 0);// 保证所有线程都执行完成才向下执行 if (count <= autoSizeThresholdNum && count > 0) { // (); } writer.close(); // String compressedFile = compressFile(tempFile); // flush(response, tempFile, fristRowName); return tempFile; } /** * 等待指定数量的线程结束 * * @param submits 线程结果集 * @param max 最大允许多少个正在执行的线程,如果超过这个数量,则等待 */ private void waiteSubmit(Collection<Future<?>> submits, int max) { while (submits.size() > max) { for (Future<?> submit : submits) { try { // 查询这个线程是否结束,如果结束从线程结果集中删除 submit.get(10, TimeUnit.MILLISECONDS); submits.remove(submit); break; } catch (TimeoutException e) { // 正常超时,查询下一个线程 } catch (ExecutionException e) { logger.warn("等待查询导出线程过程中出现异常,不再查询这个线程:{}", e.getMessage()); submits.remove(submit); } catch (InterruptedException e) { logger.warn("等待查询导出线程过程中收到中断信号:{}", e.getMessage()); } } } } public String combineExcel(@NotNull String sheetName, @NotNull Function<? super X, Map<String, Object>> process, @NotNull String sheetName1, @NotNull Function<? super Map<String, Object>, Map<String, Object>> process1, @NotNull JdbcTemplate em, @NotNull String sql, Object[] objects, String orderSql) throws IOException { String tempFile = genTempFile(); ExcelWriter writer = new BigExcelWriter(tempFile, sheetName); export(process, writer); @SuppressWarnings("deprecation") ExportUtilFromHbase<Map<String, Object>> exportUtil = new ExportUtilFromHbase<Map<String, Object>>(em, sql, objects, orderSql); exportUtil.export(process1, writer, sheetName1); // export(process1, writer); writer.close(); return tempFile; } /** * 查询类基类 */ abstract class QueryData { protected int offset; protected int limit; /** * 查询参数 */ QueryData(int offset, int limit) { this.offset = offset; this.limit = limit; } /** * 查询结果总数 */ abstract long count(); /** * 查询结果 */ abstract public List<X> query(); @Override public String toString() { return "QueryData{" + "offset=" + offset + ", limit=" + limit + '}'; } } /** * JPA查询实现类 */ class JpaQueryData extends QueryData { JpaQueryData(int offset, int limit) { super(offset, limit); } @Override long count() { return jpaSpecificationExecutor.count(spec); } @Override public List<X> query() { Pageable pageable = PageRequest.of(offset / BATCH_SIZE, BATCH_SIZE); Page<X> page = jpaSpecificationExecutor.findAll(spec, pageable); return page.getContent(); } } /** * SQL查询实现类 */ class SqlQueryData extends QueryData { private static final String SQL_REG = "SELECT .* FROM .*"; String convertCountSql() { if (!sql.matches(SQL_REG)) { logger.warn("非法的SQL查询语句:{}"); throw new RuntimeException("不能查询到有效的数据!"); } int fromIdx = sql.indexOf("FROM"); return "SELECT count(1) AS total " + sql.substring(fromIdx); } SqlQueryData(int offset, int limit) { super(offset, limit); } @Override long count() { Query query = em.createNativeQuery(convertCountSql()); query.setMaxResults(BATCH_SIZE); query.setFirstResult(0); if (Objects.nonNull(sqlParams)) { sqlParams.forEach((x, y) -> query.setParameter(x, y)); } Object singleResult = query.getSingleResult(); return Long.parseLong(singleResult.toString()); } @SuppressWarnings("unchecked") @Override public List<X> query() { Query query = em.createNativeQuery(sql); query.setMaxResults(BATCH_SIZE); query.setFirstResult(offset); if (Objects.nonNull(sqlParams)) { sqlParams.forEach((x, y) -> query.setParameter(x, y)); } query.unwrap(NativeQueryImpl.class).setResultTransformer(Transformers.ALIAS_TO_ENTITY_MAP); return query.getResultList(); } } /** * 产生一个临时文件名 * * @return * @throws UnsupportedEncodingException */ private String genTempFile() throws UnsupportedEncodingException { String tmpDir = System.getProperty(""); if (!tmpDir.endsWith(File.separator)) { tmpDir += File.separator; } return String.format("%s%s-%", tmpDir, cn.hutool.core.date.DateUtil.date().toString(DatePattern.PURE_DATETIME_MS_PATTERN), RandomUtil.randomInt(100000)); } /** * 导出实现类 */ class DB2Excel implements Runnable { QueryData queryData; ExcelWriter excelWriter; Function<? super X, Map<String, Object>> process; DB2Excel(@NotNull QueryData queryData, @NotNull ExcelWriter excelWriter, @NotNull Function<? super X, Map<String, Object>> process) { this.queryData = queryData; this.excelWriter = excelWriter; this.process = process; } @Override public void run() { logger.info("开始查询导出:{}", queryData); List<Map<String, Object>> content = queryData.query().stream().map(process).collect(Collectors.toList()); int contentSize = content.size(); long totalElements = queryData.count(); synchronized (excelWriter) { if (0 == currentLine.get() % SHEET_SIZE && CollectionUtil.isNotEmpty(content)) { if (currentLine.get() > 0 && totalElements <= autoSizeThresholdNum) { // (); } if (currentLine.get() > 0) { excelWriter.setSheet(sheetName + (currentLine.get() / SHEET_SIZE + 1)); } if (totalElements > autoSizeThresholdNum) { List<Object> firstLine = new ArrayList<>(1); firstLine.add(content.get(0)); excelWriter.write(firstLine, true); // (); content.remove(firstLine.get(0)); excelWriter.write(content, false); } else { excelWriter.write(content, true); } } else { excelWriter.write(content, false); } currentLine.addAndGet(contentSize); } } } public void export(@NotNull Function<? super X, Map<String, Object>> process, ExcelWriter writer) throws IOException { long count = Optional.ofNullable(jpaSpecificationExecutor).map(x -> x.count(spec)).orElseGet(() -> new SqlQueryData(0, 1).count()); int threadNum = Math.min(maxThreadNum, Math.max((int) count / perThreadRow, defaultThreadNum)); Set<Future<?>> submitSet = new HashSet<>(threadNum); for (int i = 0; i < count; i += BATCH_SIZE) { waiteSubmit(submitSet, threadNum - 1);// 保证至少有一个空闲线程才向下执行 QueryData queryData; if (Objects.nonNull(jpaSpecificationExecutor)) { queryData = new JpaQueryData(i, BATCH_SIZE); } else { queryData = new SqlQueryData(i, BATCH_SIZE); } Future<?> submit = executorService.submit(new DB2Excel(queryData, writer, process)); submitSet.add(submit); } waiteSubmit(submitSet, 0);// 保证所有线程都执行完成才向下执行 if (count <= autoSizeThresholdNum && count > 0) { // (); } } }