1、先是Java中对线程池定义的一个接口(ExecutorService)
//创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
private ExecutorService executorService = ();
//创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
private ExecutorService executorService = (().availableProcessors());
//创建一个定长线程池,支持定时及周期性任务执行。
private ExecutorService executorService = (10);
//创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
private ExecutorService executorService = ();
2、编写一个内部类实现
class SyncDatabase implements Runnable {
private String selectSql;
public SyncDatabase(String selectSql) {
= selectSql;
}
@Override
public void run() {
try {
("开始任务:" + selectSql);
} catch (Exception ex) {
();
}finally {
}
("------------------------执行完成--------------------------------");
}
3、执行任务
//这里执行任务
("selectSql");
--------------------------------------------------------------------------------------------------------------------------------
最后是一个完整的代码块(编写的是一个插入或更新信息的任务)
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import .slf4j.Slf4j;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import .*;
import ;
import ;
import ;
@Slf4j
@Service
public class ConversionTaskService extends ServiceSupportSync<ConversionTaskMapper, ConversionTask> implements
ConversionTaskServiceI {
private ExecutorService executorService = (().availableProcessors());
// private ExecutorService executorService = ();
// //创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
// private ExecutorService executorService = ();
//
// //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
// private ExecutorService executorService = (().availableProcessors());
//
// //创建一个定长线程池,支持定时及周期性任务执行。
// private ExecutorService executorService = (10);
//
// //创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
// private ExecutorService executorService = ();
@Resource
private Scheduler scheduler;
@Resource
ConditionsServiceI conditionsService;
@Resource
DataSourceServiceI dataSourceService;
@Resource
private Producer producer;
@Override
public void sendMqTask(Long taskId) throws Exception {
sendMqTaskTest(taskId, false);
}
/**
* @param taskId
* @param testCase false发送mq,true:不发送mq
* @return
*/
@Override
public boolean sendMqTaskTest(Long taskId, boolean testCase) {
//1、获取数据任务信息
ConversionTask task = getById(taskId);
//2、获取条件,看是否需要增加条件
List<String> condList = (());
String dataSourceSql = ();
if (() > 0) {
dataSourceSql = ((), condList);
}
//获取表原数据源
DataSource dataSource = (());
JdbcTemplate jdbcTemplate = ().getJdbcTemplate(dataSource);
//防止单表同步数据量过大时处理
Integer count = findCountSql(jdbcTemplate, dataSourceSql);
Integer pageSize = 500;
if (count > 3000) {
//如果数据量大雨3000条,查询语句作分页面查询处理
Integer timeNumber = (count / 3000) + 1;
int i = 1;
while (i <= timeNumber) {
String pageSql = getPageSql((), dataSourceSql, i, 3000);
i++;
sendMqMesage(jdbcTemplate, pageSql, task, count, pageSize, testCase);
if (testCase) {
break;
}
}
} else {
//不分页发送mq数据
if (count != 0) {
sendMqMesage(jdbcTemplate, dataSourceSql, task, count, pageSize, testCase);
}
}
return false;
}
private void sendMqMesage(JdbcTemplate jdbcTemplate, String pageSql, ConversionTask conversionTask, Integer maxSize, int pageSize) {
sendMqMesage(jdbcTemplate, pageSql, conversionTask, maxSize, pageSize, false);
}
/**
* 处理任务数据发送mq
*
* @param jdbcTemplate
* @param pageSql
* @param conversionTask
* @param maxSize
* @param pageSize
*/
private void sendMqMesage(JdbcTemplate jdbcTemplate, String pageSql, ConversionTask conversionTask, Integer maxSize, int pageSize, boolean testCase) {
List<Map<String, Object>> map = (());
("taskId:{},size:{}", () + "", () + "");
if (testCase) {
//如果是测试,不发送相关消息到同步去
map = new ArrayList<>();
}
if (() > 0) {
Integer timeNumber = (() / pageSize) + 1;
if (timeNumber == 1) {
sendMqMesage(map, conversionTask, 0, maxSize, pageSql);
} else if (timeNumber > 1) {
Integer mapIndex = 1;
List<List<Map<String, Object>>> mapList = subList(map, pageSize);
for (List<Map<String, Object>> maps : mapList) {
sendMqMesage(maps, conversionTask, mapIndex, maxSize, pageSql);
mapIndex++;
}
}
}
}
/**
* 发送数据到mq
*
* @param map
* @param task
* @param mapIndex
*/
private void sendMqMesage(List<Map<String, Object>> map, ConversionTask task, Integer mapIndex, Integer maxSize, String dataSourceSQL) {
if (() > 0) {
// Map<String, Object> maps = (0);
// map = new ArrayList<>();
// (maps);
String bods = (map, "yyyy-MM-dd HH:mm:ss", );
DataSource dbSource = (());
MqTag mqTag = (().toLowerCase());
String title = "任务:" + () + ",操作表:" + () + "操作记录数/总数:" + () + "/" + maxSize;
String tag = () + ":" + ();
if (mapIndex != 0) {
tag += "-" + mapIndex + "(" + () + "/" + maxSize + ")";
}
(bods, title, (), tag, (), dataSourceSQL);
}
}
/**
* 启动任务 需要自己完善逻辑,这里我用uuid作为taskCode 保证唯一
* 启动之前要通过数据库查询是否任务已经启动,如果启动了就不能启动了
* 启动成功了 要把数据库的任务状态改为启动中
*/
@Override
public void start() {
("启动同步任务.........");
List<ConversionTask> taskList = findList(().isEnable(ConversionTask.TASK_NOT_UPDATE).build());
(conversionTask -> {
startTask(conversionTask);
});
("成功作业成功,启动了:" + () + "个任务!");
}
/**
* 结束所有任务
*/
@Override
public void end() {
List<ConversionTask> taskList = findList(().isEnable(ConversionTask.TASK_NOT_UPDATE).build());
(conversionTask -> {
endTask(conversionTask);
});
}
/**
* 开始任务调度
*
* @param taskId 任务名称 需要唯一标识,停止任务时需要用到
*/
@Override
public String startTask(String taskId) {
return startTask(getById(taskId));
}
@Override
public String startTask(ConversionTask task) {
String msg = () + "任务启动失败";
//任务开始的corn表达式
String cronExpress = ();
//一些特定的批量任务,不能启动(cronExpress == null)
if ((cronExpress)) {
try {
JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean();
(().toString());
(Scheduler.DEFAULT_GROUP);
// 是任务所要执行操作的类
();
//任务需要的参数可以通过map方法传递,
(getJobDataMap(objectToMap(task)));
();
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
(().toString());
(cronExpress);
(Scheduler.DEFAULT_GROUP);
("cron_" + ());
//先停止再启动(防止启动存在出错)
endTask(task);
();
((), ());
msg = () + "任务启动成功";
(msg);
} catch (Exception e) {
();
}
}
return msg;
}
/**
* 结束任务调度
*
* @param taskId
*/
@Override
public void endTask(String taskId) {
endTask(getById(taskId));
}
@Override
public void endTask(ConversionTask task) {
try {
((().toString(), Scheduler.DEFAULT_GROUP));
// (() + "任务停止成功");
("{}任务停止成功", ());
} catch (SchedulerException e) {
();
("{}任务停止失败", ());
}
}
/**
* 将HashMap转为JobDataMap
*
* @param params
* @return
*/
private JobDataMap getJobDataMap(Map<String, Object> params) {
JobDataMap jdm = new JobDataMap();
Set<String> keySet = ();
Iterator<String> it = ();
while (()) {
String key = ();
// ("key:" + key);
(key, (key));
}
return jdm;
}
/**
* 将Object对象里面的属性和值转化成Map对象
*
* @param obj
* @return
* @throws IllegalAccessException
*/
private Map<String, Object> objectToMap(Object obj) throws IllegalAccessException {
Map<String, Object> map = new HashMap<>();
Class<?> clazz = ();
List<Field> fieldList = new ArrayList<>();
//当父类为null的时候说明到达了最上层的父类(Object类)
while (clazz != null) {
((()));
//得到父类,然后赋给自己
clazz = ();
}
for (Field field : fieldList) {
(true);
String fieldName = ();
Object value = (obj);
(fieldName, value);
}
return map;
}
/**
* 把查询数据语句转换成统计语句
*
* @param sql 查询语句
* @return
*/
private String getCountSql(String sql) {
if (().toUpperCase().startsWith("SELECT")) {
StringBuffer sbCount = new StringBuffer("SELECT COUNT(*) FROM ( ");
(sql);
(" ) TABLE_COUNT");
return ();
}
return sql;
}
/**
* 根据表名获取主键名称
*
* @param tableName
* @return
*/
private String getKeyName(String tableName) {
StringBuffer sbKeySql = new StringBuffer(" SELECT column_name from user_cons_columns cu ");
(",user_constraints au where cu.constraint_name = au.constraint_name and au.constraint_type = 'P' ");
(" and au.table_name ='" + tableName + "' ");
return ();
}
/**
* 设置分页查询
*
* @param dbType 数据库驱动类型
* @param sql 结果集查询
* @param pageNo 页数
* @param pageSize 条数
* @return
*/
private String getPageSql(String dbType, String sql, int pageNo, int pageSize) {
String pageSql = sql;
if (().indexOf("SELECT") > 0) {
int endPage = pageNo * pageSize;
int startPage = endPage - pageSize;
DBDriver driver = (dbType);
switch (driver) {
case ORACLE:
StringBuffer sbCount = new StringBuffer(" SELECT T1.* ");
//这里替换相关的表数据(表名也行)
("FROM ({0}) T1,");
("(SELECT RID FROM (SELECT ROWNUM RN, FROM (SELECT ROWID RID FROM ({1}) ) T");
//结束行
(" WHERE ROWNUM <= {2})");
//开始行
(" WHERE RN >{3}) T2 WHERE = ");
pageSql = ((), sql, sql, endPage + "", startPage + "");
break;
case MYSQL:
case KINGBASE:
pageSql += " LIMIT " + startPage + " OFFSET " + endPage;
default:
}
}
return pageSql;
}
/*
* 将一个list均分成n个list,主要通过偏移量来实现的
*/
public static <T> List<List<T>> subList(List<T> list, int toIndex) {
List<List<T>> result = new ArrayList<>();
int listSize = ();
for (int i = 0; i < (); i += toIndex) {
if (i + toIndex > listSize) {
toIndex = listSize - i;
}
List newList = (i, i + toIndex);
(newList);
}
return result;
}
/**
* 获取统计行数
*
* @param jdbcTemplate
* @param dataSourceSql
* @return
*/
private Integer findCountSql(JdbcTemplate jdbcTemplate, String dataSourceSql) {
try {
Integer count = (getCountSql(dataSourceSql), );
return count;
} catch (Exception e) {
}
return 0;
}
@Override
public void contrastDatabase(Long dataSourceId, Long dbId, String dataUser, String dbUser) {
DataSource dataSource = (dataSourceId);
DataSource sdataSource = (dbId);
// String sourceSql = "select t.table_name from user_tables t";
String sourceSql = "select ,t.table_name,t.num_rows from all_tables t ";
//获取表原数据源
JdbcTemplate jdbcTemplate = ().getJdbcTemplate(dataSource);
//获取表原数据源(操作)
JdbcTemplate sjdbcTemplate = ().getJdbcTemplate(sdataSource);
String dataSourceSql = sourceSql + " where = '" + () + "'";
String sdataSourceSql = sourceSql + " where = '" + () + "'";
if ((dataUser)) {
dataSourceSql = sourceSql + " where = '" + dataUser + "'";
}
if ((dataUser)) {
sdataSourceSql = sourceSql + " where = '" + dbUser + "'";
}
List<Map<String, Object>> map = (dataSourceSql);
List<Map<String, Object>> smap = (sdataSourceSql);
// List<String> tableAllList1 = ();
// List<String> tableAllList2 = ();
List<String> msgSt = ();
List<String> msgbt = ();
String tableNames = "";
for (Map<String, Object> objMap : map) {
String owner = ("OWNER").toString();
String tableName = owner + "." + ("table_name").toString();
("表名:" + tableName);
// List<String> keyNames = (tableName);
// if (() == 0) {
// getKeyNames(tableName, jdbcTemplate, true);
// continue;
// }
// (tableName + "表主键:" + ());
switch (tableName) {
case "ZYC.T_TS_TZSBXX_TJY_TBJLB":
case "ZYC.T_TS_QYXX_TJY_TBJLB":
case "ZYC.T_TS_TZSBXXB_XGJLB":
(tableName + "表数据过大,不统计!");
continue;
default:
}
StringBuilder sbSelect = new StringBuilder("SELECT count(1) FROM ");
(tableName);
Integer count1 = ((), );
try {
Integer count2 = ((), );
String msg = () + ",tableName:" + tableName + ",count:" + count1 + "------" + () + ",tableName:" + tableName + ",count:" + count2;
if ((count2)) {
(msg);
} else {
// StringBuilder deleteSql = new StringBuilder("DELETE FROM ");
// (tableName);
// (());
// deleteSql(tableName, sdataSource);
// StringBuffer sbSql = new StringBuffer();
// (" SELECT * FROM ");
// (tableName);
// (" ");
// String keyNameSql = getKeyName(tableName);
// List<String> keyNames = (keyNameSql, );
// List<String> keyNames = (tableName);
// if (() == 0) {
// getKeyNames(tableName, jdbcTemplate,true);
// continue;
// }
// (tableName +"表主键:"+());
// keyNames = (tableName);
// if (() == 0) {
// getKeyNames(tableName, sjdbcTemplate,true);
// continue;
// }
if ((tableNames)) {
tableNames += ",";
}
tableNames += tableName;
(msg);
}
} catch (Exception ex) {
String msg = () + ",tableName:" + tableName + ",count:" + count1 + "------" + () + ",tableName:" + tableName + "没找到表!";
(msg);
}
}
("-----相同数据------");
().forEach(::println);
("-----相同数据 --end------");
("-----不同数据------");
().forEach(::println);
("-----不同数据 --end------");
("-----执行完成------");
(tableNames);
// syncDatabase(taskId, tableNames);
// 交集
// List<String> intersection = ().filter(item -> (item)).collect(());
// ("---交集 intersection---");
// ().forEach( :: println);
// 差集 (list1 - list2)
// List<String> reduce1 = ().filter(item -> !(item)).collect(());
// ("---差集 reduce1 (list1 - list2)---");
// ().forEach(::println);
//
// // 差集 (list2 - list1)
// List<String> reduce2 = ().filter(item -> !(item)).collect(());
// ("---差集 reduce2 (list2 - list1)---");
// ().forEach(::println);
// 并集
// List<String> listAll = ().collect(());
// List<String> listAll2 = ().collect(());
// (listAll2);
// ("---并集 listAll---");
// ().forEachOrdered( :: println);
// 去重并集
// List<String> listAllDistinct = ().distinct().collect(());
// ("---得到去重并集 listAllDistinct---");
// ().forEachOrdered( :: println);
//
// ("---原来的List1---");
// ().forEachOrdered( :: println);
// ("---原来的List2---");
// ().forEachOrdered( :: println);
}
@Override
public void syncTableAnnotation() {
List<ConversionTask> taskList = findList(().isEnable(ConversionTask.TASK_NOT_UPDATE).build());
(conversionTask -> {
updateConversionAnnotation((),conversionTask,0);
});
}
public void updateConversionAnnotation(Long dbId,ConversionTask conversionTask,Integer isUpdate){
DataSource dataSource = (dbId);
String sourceSql = "select * from user_tab_comments WHERE TABLE_name =";
String tableName = ();
if((".")> 0){
tableName = ((".")+1);
}
sourceSql += "'"+tableName+"'";
//获取表原数据源
//mysql 语句不一样处理
switch (()) {
case :
sourceSql = "select TABLE_NAME,TABLE_COMMENT as COMMENTS from INFORMATION_SCHEMA.TABLES ";
sourceSql += "WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_SCHEMA ='"+()+"'" ;
sourceSql += " AND TABLE_NAME ='"+tableName+"'";
break;
default:
}
JdbcTemplate jdbcTemplate = ().getJdbcTemplate(dataSource);
List<Map<String, Object>> map = (sourceSql);
(sourceSql);
for (Map<String, Object> map1: map) {
String comments = ("COMMENTS").toString();
(tableName+":"+comments);
if((comments)){
(comments);
updateById(conversionTask);
}else{
if(isUpdate == 0){
updateConversionAnnotation((),conversionTask,1);
}
}
}
if(() == 0 ){
if(isUpdate == 0){
updateConversionAnnotation((),conversionTask,1);
}
}
if(isUpdate == 1){
updateConversionAnnotation(202110250000000005L,conversionTask,2);
}
}
/**
* @param
* @Description:
* @Author: xuzj
* @Title: 对比两数据库数据量
* @exception:
* @Date: 2021-0-28 8:52
**/
@Override
public void contrastDatabase(Long taskId) {
//1、获取数据任务信息
ConversionTask task = getById(taskId);
//这里获取到的是配置的相应的实例名,如
if (().equals("整库同步")) {
contrastDatabase((), (), null, null);
}
}
/**
* 手动选择数据源同步任务
* @param dataId
* @param dbId
* @param dataUser
* @param dbUser
* @param tableNames
*/
@Override
public void syncDatabase(Long dataId, Long dbId, String dataUser, String dbUser, String tableNames) {
//获取表原数据源
DataSource dataSource = (dataId);
//获取表原数据源(操作)
DataSource sdataSource = (dbId);
String sourceSql = "select ,t.table_name,t.num_rows from all_tables t ";
//获取表原数据源
JdbcTemplate jdbcTemplate = ().getJdbcTemplate(dataSource);
//获取表原数据源(操作)
JdbcTemplate sjdbcTemplate = ().getJdbcTemplate(sdataSource);
String dataSourceSql = sourceSql + " where = '" + () + "'";
String sdataSourceSql = sourceSql + " where = '" + () + "'";
if ((dataUser)) {
dataSourceSql = sourceSql + " where = '" + dataUser + "'";
}
if ((dbUser)) {
sdataSourceSql = sourceSql + " where = '" + dbUser + "'";
}
List<Map<String, Object>> map = (dataSourceSql);
List<Map<String, Object>> smap = (sdataSourceSql);
for (Map<String, Object> objMap : map) {
String tableName = ("table_name").toString();
String owner = ("OWNER").toString();
if ((dataUser)) {
tableName = dataUser + "." + tableName;
}
if (("JCC")) {
tableName = owner + "." + tableName;
}
//验证是否有指定表同步
if (!isSyncTable(tableName, tableNames)) {
continue;
}
String newTableName = (owner + "\\.", "");
if (!isTableThere(newTableName, smap)) {
("表" + tableName + "没有找到表!" + ());
continue;
}
// Integer rows = (("num_rows").toString());
StringBuffer countSql = new StringBuffer();
(" SELECT count(*) FROM " + tableName);
Integer rows = ((), );
if (rows > 0) {
//启动多线程处理数据
StringBuffer sbSql = new StringBuffer();
(" SELECT * FROM ");
(tableName);
(" ");
String keyNameSql = getKeyName(tableName);
List<String> keyNames = ();
try {
keyNames = (tableName);
if (() == 0) {
keyNames = (keyNameSql, );
}
if (() == 0) {
keyNames = getKeyNames(tableName, jdbcTemplate, false);
}
} catch (DataAccessException e) {
keyNames = getKeyNames(tableName, jdbcTemplate, false);
}
//如果数据量大雨3000条,查询语句作分页面查询处理
Integer timeNumber = (rows / 1000) + 1;
int i = 1;
while (i <= timeNumber) {
String pageSql = getPageSql((), (), i, 1000);
i++;
//这里执行任务
(new SyncDatabase(pageSql, jdbcTemplate, sjdbcTemplate, keyNames, tableName, true, ()));
}
}
}
}
/**
* 整个库同步
*
* @param taskId
*/
@Override
public void syncDatabase(Long taskId, String tableNames) {
//1、获取数据任务信息
ConversionTask task = getById(taskId);
//这里获取到的是配置的相应的实例名,如
if (().equals("整库同步")) {
syncDatabase((),(),null,null,tableNames);
}
}
/**
* 验证操作数据库表是否存在
*
* @param tableName
* @param smap
* @return
*/
public boolean isTableThere(String tableName, List<Map<String, Object>> smap) {
//验证被操作数据是否有存在表
for (Map<String, Object> map : smap) {
String tableName1 = ("table_name").toString();
if ((tableName1)) {
return true;
}
}
return false;
}
/**
* 查出主键名称和创建主键
*
* @param tableName
* @param jdbcTemplate
* @return
*/
public List<String> getKeyNames(String tableName, JdbcTemplate jdbcTemplate, boolean inserts) {
List<String> keyNames = ();
StringBuffer sbSql = new StringBuffer(" SELECT T.COLUMN_NAME FROM USER_TAB_COLUMNS T WHERE T.TABLE_NAME=");
("'" + tableName + "'");
List<String> columnsList = ((), );
keyNames = (tableName);
if (() == 0) {
List<DBColumn> dbColumns = (tableName);
columnsList = ().map(p -> ()).collect(());
}
if (keyNames != null || () > 0) {
if (() > 0) {
("表" + tableName + "没有找到主键!" + ());
List<String> keyList = ();
("ID");
("SBLSH");
("COMVEHICLEID");
("VESSELID");
("CRANEID");
("BOILERID");
("ENTERTAINMENTID");
("SUPREGULARINSPECTINFOID");
("ELEVATORID");
for (String key : keyList) {
for (String key2 : columnsList) {
if ((key2)) {
(key2);
break;
}
}
}
}
if (inserts) {
StringBuffer sbKeySql = new StringBuffer();
try {
//添加主键
(" ALTER TABLE ");
(tableName);
(" ADD CONSTRAINT ");
String newTableName = ("ZYC.", "");
("PK_" + newTableName + "_ID");
// for (String keyName : keyNames) {
// ("_"+keyName);
// }
(" PRIMARY KEY(");
for (String keyName : keyNames) {
(keyName + ",");
}
(() - 1);
(")");
// (());
} catch (Exception ex) {
();
("表" + tableName + "创建主键失败:" + ());
}
}
}
return keyNames;
}
/**
* 验证指定表是否存在
*
* @param tableName
* @param tableS
* @return
*/
public boolean isSyncTable(String tableName, String tableS) {
if ((tableS)) {
String[] tabls = (",");
for (String tabName : tabls) {
if ((tableName)) {
return true;
}
}
} else {
return true;
}
return false;
}
class SyncDatabase implements Runnable {
private String selectSql;
//来原 数据源
private JdbcTemplate jdbcTemplateYuan;
//保存数据源
private JdbcTemplate jdbcTemplateSave;
private List<String> keyNames;
private String tableName;
private boolean isInsert;
private String dbType;
private HandleMessage handle = new HandleMessage();
public SyncDatabase(String selectSql, JdbcTemplate jdbcTemplateYuan, JdbcTemplate jdbcTemplateSave, List<String> keyNames, String tableName, boolean isInsert, String dbType) {
= selectSql;
= jdbcTemplateYuan;
= jdbcTemplateSave;
= keyNames;
= tableName;
= isInsert;
= dbType;
}
@Override
public void run() {
try {
("开始任务:" + selectSql);
List<Map<String, Object>> map = (selectSql);
("开始操作:" + () + ",主键:" + ());
JSONArray updateList = new JSONArray();
JSONArray insertList = new JSONArray();
//只做对比插入
// isInsertList(keyNames, map, tableName, jdbcTemplateSave, handle, dbType);
isInsert = true;
//只插入数据
if (isInsert) {
for (int i = 0; i < (); i++) {
((i));
}
} else {
//更新和插入数据
updateInsertList(keyNames,map,tableName,jdbcTemplateSave,handle,dbType);
}
try {
(jdbcTemplateSave, insertList, tableName, dbType);
("同步任务执行insert,执行表名:{},共插入{}条数据", tableName, ());
}catch (Exception e){
();
("------------------------插入出错,执行更新方法--------------------------------");
isInsertList(keyNames,map,tableName,jdbcTemplateSave,handle,dbType);
// updateInsertList(keyNames,map,tableName,jdbcTemplateSave,handle,dbType);
}
} catch (Exception ex) {
();
}finally {
try {
(().getConnection(), ());
(().getConnection(), ());
} catch (SQLException throwables) {
();
}
}
("------------------------执行完成--------------------------------");
}
/**
* @param keyNames 条件数据
* @param map 源数据集合
* @param tableName 更新表名称
* @param jdbcTemplateSave 执行数据源
* @param handle 操作对象
* @param dbType 更新数据库类型
* @throws Exception
*/
public void updateInsertList(List<String> keyNames, List<Map<String, Object>> map, String tableName, JdbcTemplate jdbcTemplateSave, HandleMessage handle, String dbType) throws Exception {
if (() > 0) {
JSONArray updateList = new JSONArray();
JSONArray insertList = new JSONArray();
//end 对比插入
//查询需要更新的数据
for (int i = 0; i < (); i++) {
StringBuilder sbSelect = new StringBuilder("SELECT count(1) FROM ");
(tableName);
(" WHERE ");
String whereSql = "";
for (String key : keyNames) {
if ((whereSql)) {
whereSql += " and ";
}
whereSql += key + " = '" + (i).get(key) + "'";
}
(whereSql);
Integer count = ((), );
// (sbSelect + "是否有数据:"+count);
if (count > 0) {
((i));
} else {
//插入
((i));
}
}
if (() > 0) {
(jdbcTemplateSave, updateList, tableName, keyNames, dbType);
("同步任务执行update,执行表名:{},共更新{}条数据", tableName, ());
}
(jdbcTemplateSave, insertList, tableName, dbType);
("同步任务执行insert,执行表名:{},共插入{}条数据", tableName, ());
}
}
/**
* 获取插入数据源没有的数据进行插入(不更新)
*
* @param keyNames
* @param map
* @param tableName
* @param jdbcTemplateSave
* @param handle
* @param dbType
*/
public void isInsertList(List<String> keyNames, List<Map<String, Object>> map, String tableName, JdbcTemplate jdbcTemplateSave, HandleMessage handle, String dbType) throws Exception {
//根据dwnm查询数据后,判断是否存在,如果不存在,直接插入
JSONArray insertList = new JSONArray();
StringBuilder sbSelectList = new StringBuilder("SELECT " + (0) + " FROM " + tableName + " where ");
((0) + " in (");
List<String> list1 = ();
for (int i = 0; i < (); i++) {
((i).get((0)).toString());
}
for (String str : list1) {
("'" + str + "',");
}
(() - 1);
(" )");
List<String> list2 = ((), );
// 差集 (list2 - list1)
List<String> reduce2 = ().filter(item -> !(item)).collect(());
// ("---差集 reduce2 (list2 - list1)---");
// ().forEach( :: println);
if (() > 0) {
String dwnmStr = "";
for (int i = 0; i < (); i++) {
// String dwnmValue = (i).get((0)).toString();
// //去重
// if((dwnmValue) >-1){
// continue;
// }
// Optional<String> optional = ().filter(s->(dwnmValue)).findFirst();
// if(()) {
StringBuilder sbSelect = new StringBuilder("SELECT count(1) FROM ");
(tableName);
(" WHERE ");
String whereSql = "";
for (String key : keyNames) {
if ((whereSql)) {
whereSql += " and ";
}
whereSql += key + " = '" + (i).get(key) + "'";
}
(whereSql);
Integer count = ((), );
if (count == 0) {
((i));
}
// dwnmStr += dwnmValue;
// }
}
if (() > 0) {
(jdbcTemplateSave, insertList, tableName, dbType);
("同步任务执行只insert,执行表名:{},共插入{}条数据", tableName, ());
}
}
}
}
}