【项目实战】并发编程之AtomicInteger入门介绍
/**
* 分片导出数据到Excel,避免一次性数据过大,产生OOM
*/
public class ExportUtil<X> {
private Log logger = LogFactory.get();
private JpaSpecificationExecutor<X> jpaSpecificationExecutor;
private Specification<X> spec;
private EntityManager em;
private String sql;
private String sheetName;
Map<String, Object> sqlParams;
private final static int BATCH_SIZE = 2000;
private final static int SHEET_SIZE = 50000;
private final AtomicInteger currentLine = new AtomicInteger(0);
private final ExecutorService executorService = Executors.newCachedThreadPool();
private final static int defaultThreadNum = 2;
private final static int maxThreadNum = 4;
// Excel自动适应宽度比较耗时,因此在数据量大于这个阀值时,不保证所有的单元格都自动适应大小
private final static int autoSizeThresholdNum = SHEET_SIZE;
// 每个线程处理的记录数
private final static int perThreadRow = 10000;
/**
* 通过JPA的方式将查询到的结果导出到Excel
*
* @param jpaSpecificationExecutor
* @param spec
*/
public ExportUtil(@NotNull JpaSpecificationExecutor<X> jpaSpecificationExecutor, @NotNull Specification<X> spec) {
this.jpaSpecificationExecutor = jpaSpecificationExecutor;
this.spec = spec;
}
/**
* 通过写SQL语句的方式将结果导出到Excel
*
* @param em EntityManager
* @param sql sql语句必须匹配:"SELECT .* FROM .*",且不能以limit子句结尾
* @deprecated 建议使用JPA的方式导出数据
*/
@Deprecated
public ExportUtil(@NotNull EntityManager em, @NotNull String sql) {
this.em = em;
this.sql = sql;
}
/**
* 通过写SQL语句的方式将结果导出到Excel
*
* @param em EntityManager
* @param sql sql语句必须匹配:"SELECT .* FROM .*",且不能以limit子句结尾
* @param sqlParams sql的参数
* @deprecated 建议使用JPA的方式导出数据
*/
@Deprecated
public ExportUtil(@NotNull EntityManager em, @NotNull String sql, Map<String, Object> sqlParams) {
this(em, sql);
this.sqlParams = sqlParams;
}
public String export(@NotNull String sheetName, @NotNull Function<? super X, Map<String, Object>> process) throws IOException {
this.sheetName = sheetName;
return this.export(process);
}
public String export(@NotNull String sheetName) throws IOException {
this.sheetName = sheetName;
return this.export(this::process);
}
private Map<String, Object> process(X x) {
Map<String, Object> row = new LinkedHashMap<>();
Field[] fields = ReflectUtil.getFields(x.getClass());
for (Field field : fields) {
ExcelProperty excelProperty = field.getAnnotation(ExcelProperty.class);
if (excelProperty == null) {
continue;
}
String columnName = excelProperty.columnName();
String value = ReflectUtil.invoke(x, StrUtil.upperFirstAndAddPre(field.getName(), "get"));
row.put(columnName, value);
}
return row;
}
/**
* 从数据库到Excel导出流程
*
* @param fristRowName
* @param response
* @param process
* @throws IOException
*/
public String export(@NotNull Function<? super X, Map<String, Object>> process) throws IOException {
String tempFile = genTempFile();
ExcelWriter writer = ExcelUtil.getBigWriter(tempFile, sheetName);
long count = Optional.ofNullable(jpaSpecificationExecutor).map(x -> x.count(spec)).orElseGet(() -> new SqlQueryData(0, 1).count());
int threadNum = Math.min(maxThreadNum, Math.max((int) count / perThreadRow, defaultThreadNum));
Set<Future<?>> submitSet = new HashSet<>(threadNum);
for (int i = 0; i < count; i += BATCH_SIZE) {
waiteSubmit(submitSet, threadNum - 1);// 保证至少有一个空闲线程才向下执行
QueryData queryData;
if (Objects.nonNull(jpaSpecificationExecutor)) {
queryData = new JpaQueryData(i, BATCH_SIZE);
} else {
queryData = new SqlQueryData(i, BATCH_SIZE);
}
Future<?> submit = executorService.submit(new DB2Excel(queryData, writer, process));
submitSet.add(submit);
}
waiteSubmit(submitSet, 0);// 保证所有线程都执行完成才向下执行
if (count <= autoSizeThresholdNum && count > 0) {
// ();
}
writer.close();
// String compressedFile = compressFile(tempFile);
// flush(response, tempFile, fristRowName);
return tempFile;
}
/**
* 等待指定数量的线程结束
*
* @param submits 线程结果集
* @param max 最大允许多少个正在执行的线程,如果超过这个数量,则等待
*/
private void waiteSubmit(Collection<Future<?>> submits, int max) {
while (submits.size() > max) {
for (Future<?> submit : submits) {
try {
// 查询这个线程是否结束,如果结束从线程结果集中删除
submit.get(10, TimeUnit.MILLISECONDS);
submits.remove(submit);
break;
} catch (TimeoutException e) {
// 正常超时,查询下一个线程
} catch (ExecutionException e) {
logger.warn("等待查询导出线程过程中出现异常,不再查询这个线程:{}", e.getMessage());
submits.remove(submit);
} catch (InterruptedException e) {
logger.warn("等待查询导出线程过程中收到中断信号:{}", e.getMessage());
}
}
}
}
public String combineExcel(@NotNull String sheetName, @NotNull Function<? super X, Map<String, Object>> process,
@NotNull String sheetName1, @NotNull Function<? super Map<String, Object>, Map<String, Object>> process1,
@NotNull JdbcTemplate em, @NotNull String sql, Object[] objects, String orderSql) throws IOException {
String tempFile = genTempFile();
ExcelWriter writer = new BigExcelWriter(tempFile, sheetName);
export(process, writer);
@SuppressWarnings("deprecation")
ExportUtilFromHbase<Map<String, Object>> exportUtil = new ExportUtilFromHbase<Map<String, Object>>(em, sql, objects, orderSql);
exportUtil.export(process1, writer, sheetName1);
// export(process1, writer);
writer.close();
return tempFile;
}
/**
* 查询类基类
*/
abstract class QueryData {
protected int offset;
protected int limit;
/**
* 查询参数
*/
QueryData(int offset, int limit) {
this.offset = offset;
this.limit = limit;
}
/**
* 查询结果总数
*/
abstract long count();
/**
* 查询结果
*/
abstract public List<X> query();
@Override
public String toString() {
return "QueryData{" + "offset=" + offset + ", limit=" + limit + '}';
}
}
/**
* JPA查询实现类
*/
class JpaQueryData extends QueryData {
JpaQueryData(int offset, int limit) {
super(offset, limit);
}
@Override
long count() {
return jpaSpecificationExecutor.count(spec);
}
@Override
public List<X> query() {
Pageable pageable = PageRequest.of(offset / BATCH_SIZE, BATCH_SIZE);
Page<X> page = jpaSpecificationExecutor.findAll(spec, pageable);
return page.getContent();
}
}
/**
* SQL查询实现类
*/
class SqlQueryData extends QueryData {
private static final String SQL_REG = "SELECT .* FROM .*";
String convertCountSql() {
if (!sql.matches(SQL_REG)) {
logger.warn("非法的SQL查询语句:{}");
throw new RuntimeException("不能查询到有效的数据!");
}
int fromIdx = sql.indexOf("FROM");
return "SELECT count(1) AS total " + sql.substring(fromIdx);
}
SqlQueryData(int offset, int limit) {
super(offset, limit);
}
@Override
long count() {
Query query = em.createNativeQuery(convertCountSql());
query.setMaxResults(BATCH_SIZE);
query.setFirstResult(0);
if (Objects.nonNull(sqlParams)) {
sqlParams.forEach((x, y) -> query.setParameter(x, y));
}
Object singleResult = query.getSingleResult();
return Long.parseLong(singleResult.toString());
}
@SuppressWarnings("unchecked")
@Override
public List<X> query() {
Query query = em.createNativeQuery(sql);
query.setMaxResults(BATCH_SIZE);
query.setFirstResult(offset);
if (Objects.nonNull(sqlParams)) {
sqlParams.forEach((x, y) -> query.setParameter(x, y));
}
query.unwrap(NativeQueryImpl.class).setResultTransformer(Transformers.ALIAS_TO_ENTITY_MAP);
return query.getResultList();
}
}
/**
* 产生一个临时文件名
*
* @return
* @throws UnsupportedEncodingException
*/
private String genTempFile() throws UnsupportedEncodingException {
String tmpDir = System.getProperty("");
if (!tmpDir.endsWith(File.separator)) {
tmpDir += File.separator;
}
return String.format("%s%s-%", tmpDir, cn.hutool.core.date.DateUtil.date().toString(DatePattern.PURE_DATETIME_MS_PATTERN),
RandomUtil.randomInt(100000));
}
/**
* 导出实现类
*/
class DB2Excel implements Runnable {
QueryData queryData;
ExcelWriter excelWriter;
Function<? super X, Map<String, Object>> process;
DB2Excel(@NotNull QueryData queryData, @NotNull ExcelWriter excelWriter,
@NotNull Function<? super X, Map<String, Object>> process) {
this.queryData = queryData;
this.excelWriter = excelWriter;
this.process = process;
}
@Override
public void run() {
logger.info("开始查询导出:{}", queryData);
List<Map<String, Object>> content = queryData.query().stream().map(process).collect(Collectors.toList());
int contentSize = content.size();
long totalElements = queryData.count();
synchronized (excelWriter) {
if (0 == currentLine.get() % SHEET_SIZE && CollectionUtil.isNotEmpty(content)) {
if (currentLine.get() > 0 && totalElements <= autoSizeThresholdNum) {
// ();
}
if (currentLine.get() > 0) {
excelWriter.setSheet(sheetName + (currentLine.get() / SHEET_SIZE + 1));
}
if (totalElements > autoSizeThresholdNum) {
List<Object> firstLine = new ArrayList<>(1);
firstLine.add(content.get(0));
excelWriter.write(firstLine, true);
// ();
content.remove(firstLine.get(0));
excelWriter.write(content, false);
} else {
excelWriter.write(content, true);
}
} else {
excelWriter.write(content, false);
}
currentLine.addAndGet(contentSize);
}
}
}
public void export(@NotNull Function<? super X, Map<String, Object>> process, ExcelWriter writer) throws IOException {
long count = Optional.ofNullable(jpaSpecificationExecutor).map(x -> x.count(spec)).orElseGet(() -> new SqlQueryData(0, 1).count());
int threadNum = Math.min(maxThreadNum, Math.max((int) count / perThreadRow, defaultThreadNum));
Set<Future<?>> submitSet = new HashSet<>(threadNum);
for (int i = 0; i < count; i += BATCH_SIZE) {
waiteSubmit(submitSet, threadNum - 1);// 保证至少有一个空闲线程才向下执行
QueryData queryData;
if (Objects.nonNull(jpaSpecificationExecutor)) {
queryData = new JpaQueryData(i, BATCH_SIZE);
} else {
queryData = new SqlQueryData(i, BATCH_SIZE);
}
Future<?> submit = executorService.submit(new DB2Excel(queryData, writer, process));
submitSet.add(submit);
}
waiteSubmit(submitSet, 0);// 保证所有线程都执行完成才向下执行
if (count <= autoSizeThresholdNum && count > 0) {
// ();
}
}
}