Java多线程大批量同步数据(分页)
package com.github.admin.controller.loans;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.github.admin.model.entity.CaseCheckCallRecord;
import com.github.admin.model.entity.duyan.DuyanCallRecordDetail;
import com.github.admin.model.entity.loans.CaseCallRemarkRecord;
import com.github.admin.service.duyan.DuyanCallRecordDetailService;
import com.github.admin.service.loans.CaseCallRemarkRecordService;
import com.github.common.constant.MongodbConstant;
import com.github.common.util.DingDingMsgSendUtils;
import com.github.common.util.ListUtils;
import com.github.common.util.Response;
import com.github.common.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
/**
* 多线程同步历史数据
* @author songfayuan
* @date 2019-09-26 15:38
*/
@Slf4j
@RestController
@RequestMapping("/demo")
public class SynchronizeHistoricalDataController implements DisposableBean {
private ExecutorService executor = Executors.newFixedThreadPool(10, "SynchronizeHistoricalDataController"); //newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
@Value("${}")
private String profile;
@Autowired
private DuyanCallRecordDetailService duyanCallRecordDetailService;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private CaseCallRemarkRecordService caseCallRemarkRecordService;
/**
* 多线程同步通话记录历史数据
* @param params
* @return
* @throws Exception
*/
@GetMapping("/syncHistoryData")
public Response syncHistoryData(Map<String, Object> params) throws Exception {
executor.execute(new Runnable() {
@Override
public void run() {
try {
logicHandler(params);
} catch (Exception e) {
log.warn("多线程同步稽查通话记录历史数据才处理异常,errMsg = {}", e);
DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,多线程同步稽查通话记录历史数据才处理异常,errMsg = "+e);
}
}
});
return Response.success("请求成功");
}
/**
* 处理数据逻辑
* @param params
* @throws Exception
*/
private void logicHandler(Map<String, Object> params) throws Exception {
/******返回结果:多线程处理完的最终数据******/
List<DuyanCallRecordDetail> result = new ArrayList<>();
/******查询数据库总的数据条数******/
int count = this.duyanCallRecordDetailService.selectCount(new EntityWrapper<DuyanCallRecordDetail>()
.eq("is_delete", 0)
.eq("platform_type", 1));
DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,本次需要同步" + count + "条历史稽查通话记录数据。");
// int count = 2620266;
/******限制每次查询的条数******/
int num = 1000;
/******计算需要查询的次数******/
int times = count / num;
if (count % num != 0) {
times = times + 1;
}
/******每个线程开始查询的行数******/
int offset = 0;
/******添加任务******/
List<Callable<List<DuyanCallRecordDetail>>> tasks = new ArrayList<>();
for (int i = 0; i < times; i++) {
Callable<List<DuyanCallRecordDetail>> qfe = new ThredQuery(duyanCallRecordDetailService, params, offset, num);
tasks.add(qfe);
offset = offset + num;
}
/******为避免太多任务的最终数据全部存在list导致内存溢出,故将任务再次拆分单独处理******/
List<List<Callable<List<DuyanCallRecordDetail>>>> smallList = ListUtils.partition(tasks, 10);
for (List<Callable<List<DuyanCallRecordDetail>>> callableList : smallList) {
if (CollectionUtils.isNotEmpty(callableList)) {
// (new Runnable() {
// @Override
// public void run() {
// ("任务拆分执行开始:线程{}拆分处理开始...", ().getName());
//
// ("任务拆分执行结束:线程{}拆分处理开始...", ().getName());
// }
// });
try {
List<Future<List<DuyanCallRecordDetail>>> futures = executor.invokeAll(callableList);
/******处理线程返回结果******/
if (futures != null && futures.size() > 0) {
for (Future<List<DuyanCallRecordDetail>> future : futures) {
List<DuyanCallRecordDetail> duyanCallRecordDetailList = future.get();
if (CollectionUtils.isNotEmpty(duyanCallRecordDetailList)){
executor.execute(new Runnable() {
@Override
public void run() {
/******异步存储******/
log.info("异步存储MongoDB开始:线程{}拆分处理开始...", Thread.currentThread().getName());
saveMongoDB(duyanCallRecordDetailList);
log.info("异步存储MongoDB结束:线程{}拆分处理开始...", Thread.currentThread().getName());
}
});
}
//(());
}
}
} catch (Exception e) {
log.warn("任务拆分执行异常,errMsg = {}", e);
DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,任务拆分执行异常,errMsg = "+e);
}
}
}
}
/**
* 数据存储MongoDB
* @param duyanCallRecordDetailList
*/
private void saveMongoDB(List<DuyanCallRecordDetail> duyanCallRecordDetailList) {
for (DuyanCallRecordDetail duyanCallRecordDetail : duyanCallRecordDetailList) {
/******重复数据不同步MongoDB******/
org.springframework.data.mongodb.core.query.Query query = new org.springframework.data.mongodb.core.query.Query();
query.addCriteria(Criteria.where("callUuid").is(duyanCallRecordDetail.getCallUuid()));
List<CaseCheckCallRecord> caseCheckCallRecordList = mongoTemplate.find(query, CaseCheckCallRecord.class, MongodbConstant.CASE_CHECK_CALL_RECORD);
if (CollectionUtils.isNotEmpty(caseCheckCallRecordList)) {
log.warn("call_uuid = {}在MongoDB已经存在数据,后面数据将被舍弃...", duyanCallRecordDetail.getCallUuid());
continue;
}
/******关联填写的记录******/
CaseCallRemarkRecord caseCallRemarkRecord = this.caseCallRemarkRecordService.selectOne(new EntityWrapper<CaseCallRemarkRecord>()
.eq("is_delete", 0)
.eq("call_uuid", duyanCallRecordDetail.getCallUuid()));
CaseCheckCallRecord caseCheckCallRecord = new CaseCheckCallRecord();
BeanUtils.copyProperties(duyanCallRecordDetail, caseCheckCallRecord);
//补充
caseCheckCallRecord.setCollectorUserId(duyanCallRecordDetail.getUserId());
if (caseCallRemarkRecord != null) {
//补充
caseCheckCallRecord.setCalleeName(caseCallRemarkRecord.getContactName());
}
log.info("正在存储数据到MongoDB:{}", caseCheckCallRecord.toString());
this.mongoTemplate.save(caseCheckCallRecord, MongodbConstant.CASE_CHECK_CALL_RECORD);
}
}
@Override
public void destroy() throws Exception {
executor.shutdown();
}
}
class ThredQuery implements Callable<List<DuyanCallRecordDetail>> {
/******需要通过构造方法把对应的业务service传进来 实际用的时候把类型变为对应的类型******/
private DuyanCallRecordDetailService myService;
/******查询条件 根据条件来定义该类的属性******/
private Map<String, Object> params;
/******分页index******/
private int offset;
/******数量******/
private int num;
public ThredQuery(DuyanCallRecordDetailService myService, Map<String, Object> params, int offset, int num) {
this.myService = myService;
this.params = params;
this.offset = offset;
this.num = num;
}
@Override
public List<DuyanCallRecordDetail> call() throws Exception {
/******通过service查询得到对应结果******/
List<DuyanCallRecordDetail> duyanCallRecordDetailList = myService.selectList(new EntityWrapper<DuyanCallRecordDetail>()
.eq("is_delete", 0)
.eq("platform_type", 1)
.last("limit "+offset+", "+num));
return duyanCallRecordDetailList;
}
}