java使用多线程操作任务(内部类)

时间:2025-03-10 10:09:30

1、先是Java中对线程池定义的一个接口(ExecutorService)

    //创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    private ExecutorService executorService = ();

    //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    private ExecutorService executorService = (().availableProcessors());

    //创建一个定长线程池,支持定时及周期性任务执行。
    private ExecutorService executorService = (10);

    //创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
    private ExecutorService executorService = ();

2、编写一个内部类实现

 class SyncDatabase implements Runnable {

        private String selectSql;

        public SyncDatabase(String selectSql) {
             = selectSql;
        }

        @Override
        public void run() {
            try {
                ("开始任务:" + selectSql);
               } catch (Exception ex) {
                ();
            }finally {
 
              
            }
            ("------------------------执行完成--------------------------------");
}

3、执行任务

 //这里执行任务
("selectSql");
 

--------------------------------------------------------------------------------------------------------------------------------

最后是一个完整的代码块(编写的是一个插入或更新信息的任务)


package ;

import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import .slf4j.Slf4j;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;

import ;
import ;
import ;
import .*;
import ;
import ;
import ;

@Slf4j
@Service
public class ConversionTaskService extends ServiceSupportSync<ConversionTaskMapper, ConversionTask> implements
        ConversionTaskServiceI {

    private ExecutorService executorService = (().availableProcessors());
//    private ExecutorService executorService = ();

//    //创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
//    private ExecutorService executorService = ();
//
//    //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
//    private ExecutorService executorService = (().availableProcessors());
//
//    //创建一个定长线程池,支持定时及周期性任务执行。
//    private ExecutorService executorService = (10);
//
//    //创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
//    private ExecutorService executorService = ();


    @Resource
    private Scheduler scheduler;
    @Resource
    ConditionsServiceI conditionsService;
    @Resource
    DataSourceServiceI dataSourceService;
    @Resource
    private Producer producer;

    @Override
    public void sendMqTask(Long taskId) throws Exception {
        sendMqTaskTest(taskId, false);
    }


    /**
     * @param taskId
     * @param testCase false发送mq,true:不发送mq
     * @return
     */
    @Override
    public boolean sendMqTaskTest(Long taskId, boolean testCase) {
        //1、获取数据任务信息
        ConversionTask task = getById(taskId);
        //2、获取条件,看是否需要增加条件
        List<String> condList = (());
        String dataSourceSql = ();
        if (() > 0) {
            dataSourceSql = ((), condList);
        }


        //获取表原数据源
        DataSource dataSource = (());

        JdbcTemplate jdbcTemplate = ().getJdbcTemplate(dataSource);
        //防止单表同步数据量过大时处理
        Integer count = findCountSql(jdbcTemplate, dataSourceSql);
        Integer pageSize = 500;
        if (count > 3000) {
            //如果数据量大雨3000条,查询语句作分页面查询处理
            Integer timeNumber = (count / 3000) + 1;
            int i = 1;
            while (i <= timeNumber) {
                String pageSql = getPageSql((), dataSourceSql, i, 3000);
                i++;
                sendMqMesage(jdbcTemplate, pageSql, task, count, pageSize, testCase);
                if (testCase) {
                    break;
                }
            }
        } else {
            //不分页发送mq数据
            if (count != 0) {
                sendMqMesage(jdbcTemplate, dataSourceSql, task, count, pageSize, testCase);
            }
        }
        return false;
    }


    private void sendMqMesage(JdbcTemplate jdbcTemplate, String pageSql, ConversionTask conversionTask, Integer maxSize, int pageSize) {
        sendMqMesage(jdbcTemplate, pageSql, conversionTask, maxSize, pageSize, false);
    }

    /**
     * 处理任务数据发送mq
     *
     * @param jdbcTemplate
     * @param pageSql
     * @param conversionTask
     * @param maxSize
     * @param pageSize
     */
    private void sendMqMesage(JdbcTemplate jdbcTemplate, String pageSql, ConversionTask conversionTask, Integer maxSize, int pageSize, boolean testCase) {
        List<Map<String, Object>> map = (());
        ("taskId:{},size:{}", () + "", () + "");
        if (testCase) {
            //如果是测试,不发送相关消息到同步去
            map = new ArrayList<>();
        }
        if (() > 0) {
            Integer timeNumber = (() / pageSize) + 1;
            if (timeNumber == 1) {
                sendMqMesage(map, conversionTask, 0, maxSize, pageSql);
            } else if (timeNumber > 1) {
                Integer mapIndex = 1;
                List<List<Map<String, Object>>> mapList = subList(map, pageSize);
                for (List<Map<String, Object>> maps : mapList) {
                    sendMqMesage(maps, conversionTask, mapIndex, maxSize, pageSql);
                    mapIndex++;
                }
            }
        }
    }


    /**
     * 发送数据到mq
     *
     * @param map
     * @param task
     * @param mapIndex
     */
    private void sendMqMesage(List<Map<String, Object>> map, ConversionTask task, Integer mapIndex, Integer maxSize, String dataSourceSQL) {
        if (() > 0) {
//            Map<String, Object> maps = (0);
//            map = new ArrayList<>();
//            (maps);
            String bods = (map, "yyyy-MM-dd HH:mm:ss", );
            DataSource dbSource = (());
            MqTag mqTag = (().toLowerCase());
            String title = "任务:" + () + ",操作表:" + () + "操作记录数/总数:" + () + "/" + maxSize;
            String tag = () + ":" + ();
            if (mapIndex != 0) {
                tag += "-" + mapIndex + "(" + () + "/" + maxSize + ")";
            }
            (bods, title, (), tag, (), dataSourceSQL);
        }
    }


    /**
     * 启动任务 需要自己完善逻辑,这里我用uuid作为taskCode 保证唯一
     * 启动之前要通过数据库查询是否任务已经启动,如果启动了就不能启动了
     * 启动成功了 要把数据库的任务状态改为启动中
     */
    @Override
    public void start() {
        ("启动同步任务.........");
        List<ConversionTask> taskList = findList(().isEnable(ConversionTask.TASK_NOT_UPDATE).build());
        (conversionTask -> {
            startTask(conversionTask);
        });
        ("成功作业成功,启动了:" + () + "个任务!");
    }

    /**
     * 结束所有任务
     */
    @Override
    public void end() {
        List<ConversionTask> taskList = findList(().isEnable(ConversionTask.TASK_NOT_UPDATE).build());
        (conversionTask -> {
            endTask(conversionTask);
        });
    }


    /**
     * 开始任务调度
     *
     * @param taskId 任务名称 需要唯一标识,停止任务时需要用到
     */

    @Override
    public String startTask(String taskId) {
        return startTask(getById(taskId));
    }

    @Override
    public String startTask(ConversionTask task) {
        String msg = () + "任务启动失败";
        //任务开始的corn表达式
        String cronExpress = ();
        //一些特定的批量任务,不能启动(cronExpress == null)
        if ((cronExpress)) {
            try {
                JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean();
                (().toString());
                (Scheduler.DEFAULT_GROUP);
                // 是任务所要执行操作的类
                ();
                //任务需要的参数可以通过map方法传递,
                (getJobDataMap(objectToMap(task)));
                ();
                CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
                (().toString());
                (cronExpress);
                (Scheduler.DEFAULT_GROUP);
                ("cron_" + ());
                //先停止再启动(防止启动存在出错)
                endTask(task);
                ();
                ((), ());
                msg = () + "任务启动成功";
                (msg);
            } catch (Exception e) {
                ();
            }
        }
        return msg;
    }

    /**
     * 结束任务调度
     *
     * @param taskId
     */
    @Override
    public void endTask(String taskId) {
        endTask(getById(taskId));
    }

    @Override
    public void endTask(ConversionTask task) {
        try {
            ((().toString(), Scheduler.DEFAULT_GROUP));
//            (() + "任务停止成功");
            ("{}任务停止成功", ());
        } catch (SchedulerException e) {
            ();
            ("{}任务停止失败", ());
        }
    }


    /**
     * 将HashMap转为JobDataMap
     *
     * @param params
     * @return
     */
    private JobDataMap getJobDataMap(Map<String, Object> params) {
        JobDataMap jdm = new JobDataMap();
        Set<String> keySet = ();
        Iterator<String> it = ();
        while (()) {
            String key = ();
//            ("key:" + key);
            (key, (key));
        }
        return jdm;
    }


    /**
     * 将Object对象里面的属性和值转化成Map对象
     *
     * @param obj
     * @return
     * @throws IllegalAccessException
     */
    private Map<String, Object> objectToMap(Object obj) throws IllegalAccessException {
        Map<String, Object> map = new HashMap<>();
        Class<?> clazz = ();
        List<Field> fieldList = new ArrayList<>();
        //当父类为null的时候说明到达了最上层的父类(Object类)
        while (clazz != null) {
            ((()));
            //得到父类,然后赋给自己
            clazz = ();
        }
        for (Field field : fieldList) {
            (true);
            String fieldName = ();
            Object value = (obj);
            (fieldName, value);
        }
        return map;
    }

    /**
     * 把查询数据语句转换成统计语句
     *
     * @param sql 查询语句
     * @return
     */
    private String getCountSql(String sql) {
        if (().toUpperCase().startsWith("SELECT")) {
            StringBuffer sbCount = new StringBuffer("SELECT COUNT(*) FROM ( ");
            (sql);
            (" ) TABLE_COUNT");
            return ();
        }
        return sql;
    }


    /**
     * 根据表名获取主键名称
     *
     * @param tableName
     * @return
     */
    private String getKeyName(String tableName) {
        StringBuffer sbKeySql = new StringBuffer(" SELECT column_name from user_cons_columns cu ");
        (",user_constraints au where cu.constraint_name = au.constraint_name and au.constraint_type = 'P' ");
        (" and au.table_name ='" + tableName + "' ");
        return ();

    }


    /**
     * 设置分页查询
     *
     * @param dbType   数据库驱动类型
     * @param sql      结果集查询
     * @param pageNo   页数
     * @param pageSize 条数
     * @return
     */
    private String getPageSql(String dbType, String sql, int pageNo, int pageSize) {
        String pageSql = sql;
        if (().indexOf("SELECT") > 0) {
            int endPage = pageNo * pageSize;
            int startPage = endPage - pageSize;

            DBDriver driver = (dbType);
            switch (driver) {
                case ORACLE:
                    StringBuffer sbCount = new StringBuffer(" SELECT T1.* ");
                    //这里替换相关的表数据(表名也行)
                    ("FROM ({0}) T1,");
                    ("(SELECT RID FROM (SELECT ROWNUM RN, FROM (SELECT ROWID RID FROM ({1}) ) T");
                    //结束行
                    ("   WHERE ROWNUM <= {2})");
                    //开始行
                    ("    WHERE RN >{3}) T2 WHERE  = ");

                    pageSql = ((), sql, sql, endPage + "", startPage + "");
                    break;
                case MYSQL:
                case KINGBASE:
                    pageSql += "  LIMIT " + startPage + " OFFSET " + endPage;
                default:
            }
        }
        return pageSql;
    }


    /*
     * 将一个list均分成n个list,主要通过偏移量来实现的
     */
    public static <T> List<List<T>> subList(List<T> list, int toIndex) {
        List<List<T>> result = new ArrayList<>();
        int listSize = ();
        for (int i = 0; i < (); i += toIndex) {
            if (i + toIndex > listSize) {
                toIndex = listSize - i;
            }
            List newList = (i, i + toIndex);
            (newList);
        }
        return result;
    }


    /**
     * 获取统计行数
     *
     * @param jdbcTemplate
     * @param dataSourceSql
     * @return
     */
    private Integer findCountSql(JdbcTemplate jdbcTemplate, String dataSourceSql) {
        try {
            Integer count = (getCountSql(dataSourceSql), );
            return count;
        } catch (Exception e) {

        }
        return 0;
    }


    @Override
    public void contrastDatabase(Long dataSourceId, Long dbId, String dataUser, String dbUser) {
        DataSource dataSource = (dataSourceId);
        DataSource sdataSource = (dbId);
//            String sourceSql = "select t.table_name from user_tables t";
        String sourceSql = "select ,t.table_name,t.num_rows from all_tables t ";
        //获取表原数据源
        JdbcTemplate jdbcTemplate = ().getJdbcTemplate(dataSource);
        //获取表原数据源(操作)
        JdbcTemplate sjdbcTemplate = ().getJdbcTemplate(sdataSource);

        String dataSourceSql = sourceSql + " where  = '" + () + "'";
        String sdataSourceSql = sourceSql + " where  = '" + () + "'";

        if ((dataUser)) {
            dataSourceSql = sourceSql + " where  = '" + dataUser + "'";
        }
        if ((dataUser)) {
            sdataSourceSql = sourceSql + " where  = '" + dbUser + "'";
        }

        List<Map<String, Object>> map = (dataSourceSql);
        List<Map<String, Object>> smap = (sdataSourceSql);

//            List<String> tableAllList1 = ();
//            List<String> tableAllList2 = ();

        List<String> msgSt = ();
        List<String> msgbt = ();
        String tableNames = "";

        for (Map<String, Object> objMap : map) {
            String owner = ("OWNER").toString();
            String tableName = owner + "." + ("table_name").toString();
            ("表名:" + tableName);
//                List<String> keyNames = (tableName);
//                if (() == 0) {
//                    getKeyNames(tableName, jdbcTemplate, true);
//                    continue;
//                }
//                (tableName + "表主键:" + ());

            switch (tableName) {
                case "ZYC.T_TS_TZSBXX_TJY_TBJLB":
                case "ZYC.T_TS_QYXX_TJY_TBJLB":
                case "ZYC.T_TS_TZSBXXB_XGJLB":
                    (tableName + "表数据过大,不统计!");
                    continue;
                default:

            }

            StringBuilder sbSelect = new StringBuilder("SELECT count(1) FROM ");
            (tableName);
            Integer count1 = ((), );
            try {
                Integer count2 = ((), );
                String msg = () + ",tableName:" + tableName + ",count:" + count1 + "------" + () + ",tableName:" + tableName + ",count:" + count2;
                if ((count2)) {
                    (msg);
                } else {
//                        StringBuilder deleteSql = new StringBuilder("DELETE FROM  ");
//                        (tableName);
//                        (());
//                        deleteSql(tableName, sdataSource);


//                        StringBuffer sbSql = new StringBuffer();
//                        (" SELECT * FROM ");
//                        (tableName);
//                        (" ");
//                        String keyNameSql = getKeyName(tableName);
//                        List<String> keyNames = (keyNameSql, );
//                        List<String> keyNames =  (tableName);
//                        if (() == 0) {
//                            getKeyNames(tableName, jdbcTemplate,true);
//                            continue;
//                        }
//                        (tableName  +"表主键:"+());
//                       keyNames =  (tableName);
//                        if (() == 0) {
//                            getKeyNames(tableName, sjdbcTemplate,true);
//                            continue;
//                        }

                    if ((tableNames)) {
                        tableNames += ",";
                    }
                    tableNames += tableName;
                    (msg);
                }
            } catch (Exception ex) {
                String msg = () + ",tableName:" + tableName + ",count:" + count1 + "------" + () + ",tableName:" + tableName + "没找到表!";
                (msg);
            }
        }
        ("-----相同数据------");
        ().forEach(::println);
        ("-----相同数据 --end------");

        ("-----不同数据------");
        ().forEach(::println);
        ("-----不同数据 --end------");
        ("-----执行完成------");
        (tableNames);
//            syncDatabase(taskId, tableNames);

        // 交集
//            List<String> intersection = ().filter(item -> (item)).collect(());
//            ("---交集 intersection---");
//            ().forEach( :: println);

        // 差集 (list1 - list2)
//            List<String> reduce1 = ().filter(item -> !(item)).collect(());
//            ("---差集 reduce1 (list1 - list2)---");
//            ().forEach(::println);
//
//            // 差集 (list2 - list1)
//            List<String> reduce2 = ().filter(item -> !(item)).collect(());
//            ("---差集 reduce2 (list2 - list1)---");
//            ().forEach(::println);

        // 并集
//            List<String> listAll = ().collect(());
//            List<String> listAll2 = ().collect(());
//            (listAll2);
//            ("---并集 listAll---");
//            ().forEachOrdered( :: println);

        // 去重并集
//            List<String> listAllDistinct = ().distinct().collect(());
//            ("---得到去重并集 listAllDistinct---");
//            ().forEachOrdered( :: println);
//
//            ("---原来的List1---");
//            ().forEachOrdered( :: println);
//            ("---原来的List2---");
//            ().forEachOrdered( :: println);


    }

    @Override
    public void syncTableAnnotation() {
        List<ConversionTask> taskList = findList(().isEnable(ConversionTask.TASK_NOT_UPDATE).build());
        (conversionTask -> {
            updateConversionAnnotation((),conversionTask,0);
        });
    }


    public void updateConversionAnnotation(Long dbId,ConversionTask conversionTask,Integer isUpdate){
        DataSource dataSource = (dbId);
        String sourceSql = "select * from user_tab_comments WHERE TABLE_name =";
        String tableName = ();
        if((".")> 0){
            tableName = ((".")+1);
        }
        sourceSql += "'"+tableName+"'";
        //获取表原数据源
        //mysql 语句不一样处理
        switch (()) {
            case :
                sourceSql = "select TABLE_NAME,TABLE_COMMENT as COMMENTS  from INFORMATION_SCHEMA.TABLES ";
                sourceSql += "WHERE   TABLE_TYPE = 'BASE TABLE' AND TABLE_SCHEMA ='"+()+"'" ;
                sourceSql += " AND TABLE_NAME ='"+tableName+"'";
                break;
            default:
        }

        JdbcTemplate jdbcTemplate = ().getJdbcTemplate(dataSource);
        List<Map<String, Object>> map = (sourceSql);
        (sourceSql);
        for (Map<String, Object> map1:  map) {
            String comments = ("COMMENTS").toString();
            (tableName+":"+comments);
            if((comments)){
                (comments);
                updateById(conversionTask);
            }else{
                if(isUpdate == 0){
                    updateConversionAnnotation((),conversionTask,1);
                }
            }
        }

        if(() == 0 ){
            if(isUpdate == 0){
                updateConversionAnnotation((),conversionTask,1);
            }
        }

        if(isUpdate == 1){
            updateConversionAnnotation(202110250000000005L,conversionTask,2);
        }

    }

    /**
     * @param
     * @Description:
     * @Author: xuzj
     * @Title: 对比两数据库数据量
     * @exception:
     * @Date: 2021-0-28 8:52
     **/
    @Override
    public void contrastDatabase(Long taskId) {
        //1、获取数据任务信息
        ConversionTask task = getById(taskId);
        //这里获取到的是配置的相应的实例名,如
        if (().equals("整库同步")) {
            contrastDatabase((), (), null, null);
        }
    }


    /**
     * 手动选择数据源同步任务
     * @param dataId
     * @param dbId
     * @param dataUser
     * @param dbUser
     * @param tableNames
     */
    @Override
    public void syncDatabase(Long dataId, Long dbId, String dataUser, String dbUser, String tableNames) {
        //获取表原数据源
        DataSource dataSource = (dataId);
        //获取表原数据源(操作)
        DataSource sdataSource = (dbId);

        String sourceSql = "select ,t.table_name,t.num_rows from all_tables t ";
        //获取表原数据源
        JdbcTemplate jdbcTemplate = ().getJdbcTemplate(dataSource);
        //获取表原数据源(操作)
        JdbcTemplate sjdbcTemplate = ().getJdbcTemplate(sdataSource);

        String dataSourceSql = sourceSql + " where  = '" + () + "'";
        String sdataSourceSql = sourceSql + " where  = '" + () + "'";

        if ((dataUser)) {
            dataSourceSql = sourceSql + " where  = '" + dataUser + "'";
        }
        if ((dbUser)) {
            sdataSourceSql = sourceSql + " where  = '" + dbUser + "'";
        }


        List<Map<String, Object>> map = (dataSourceSql);
        List<Map<String, Object>> smap = (sdataSourceSql);

        for (Map<String, Object> objMap : map) {
            String tableName = ("table_name").toString();
            String owner = ("OWNER").toString();
            if ((dataUser)) {
                tableName = dataUser + "." + tableName;
            }
            if (("JCC")) {
                tableName = owner + "." + tableName;
            }

            //验证是否有指定表同步
            if (!isSyncTable(tableName, tableNames)) {
                continue;
            }
            String newTableName = (owner + "\\.", "");
            if (!isTableThere(newTableName, smap)) {
                ("表" + tableName + "没有找到表!" + ());
                continue;
            }
//                    Integer rows = (("num_rows").toString());
            StringBuffer countSql = new StringBuffer();
            (" SELECT count(*) FROM " + tableName);
            Integer rows = ((), );
            if (rows > 0) {
                //启动多线程处理数据
                StringBuffer sbSql = new StringBuffer();
                (" SELECT * FROM ");
                (tableName);
                (" ");
                String keyNameSql = getKeyName(tableName);
                List<String> keyNames = ();
                try {
                    keyNames = (tableName);
                    if (() == 0) {
                        keyNames = (keyNameSql, );
                    }
                    if (() == 0) {
                        keyNames = getKeyNames(tableName, jdbcTemplate, false);
                    }
                } catch (DataAccessException e) {
                    keyNames = getKeyNames(tableName, jdbcTemplate, false);
                }

                //如果数据量大雨3000条,查询语句作分页面查询处理
                Integer timeNumber = (rows / 1000) + 1;
                int i = 1;
                while (i <= timeNumber) {
                    String pageSql = getPageSql((), (), i, 1000);
                    i++;
                    //这里执行任务
                    (new SyncDatabase(pageSql, jdbcTemplate, sjdbcTemplate, keyNames, tableName, true, ()));
                }
            }
        }
    }

    /**
     * 整个库同步
     *
     * @param taskId
     */
    @Override
    public void syncDatabase(Long taskId, String tableNames) {
        //1、获取数据任务信息
        ConversionTask task = getById(taskId);
        //这里获取到的是配置的相应的实例名,如
        if (().equals("整库同步")) {
            syncDatabase((),(),null,null,tableNames);
        }
    }


    /**
     * 验证操作数据库表是否存在
     *
     * @param tableName
     * @param smap
     * @return
     */
    public boolean isTableThere(String tableName, List<Map<String, Object>> smap) {
        //验证被操作数据是否有存在表
        for (Map<String, Object> map : smap) {
            String tableName1 = ("table_name").toString();
            if ((tableName1)) {
                return true;
            }
        }
        return false;
    }


    /**
     * 查出主键名称和创建主键
     *
     * @param tableName
     * @param jdbcTemplate
     * @return
     */
    public List<String> getKeyNames(String tableName, JdbcTemplate jdbcTemplate, boolean inserts) {
        List<String> keyNames = ();

        StringBuffer sbSql = new StringBuffer(" SELECT T.COLUMN_NAME FROM USER_TAB_COLUMNS T WHERE T.TABLE_NAME=");
        ("'" + tableName + "'");
        List<String> columnsList = ((), );


        keyNames = (tableName);
        if (() == 0) {
            List<DBColumn> dbColumns = (tableName);
            columnsList = ().map(p -> ()).collect(());
        }
        if (keyNames != null || () > 0) {
            if (() > 0) {
                ("表" + tableName + "没有找到主键!" + ());
                List<String> keyList = ();
                ("ID");
                ("SBLSH");
                ("COMVEHICLEID");
                ("VESSELID");
                ("CRANEID");
                ("BOILERID");
                ("ENTERTAINMENTID");
                ("SUPREGULARINSPECTINFOID");
                ("ELEVATORID");

                for (String key : keyList) {
                    for (String key2 : columnsList) {
                        if ((key2)) {
                            (key2);
                            break;
                        }
                    }
                }
            }
            if (inserts) {
                StringBuffer sbKeySql = new StringBuffer();
                try {
                    //添加主键
                    (" ALTER TABLE ");
                    (tableName);
                    (" ADD CONSTRAINT ");
                    String newTableName = ("ZYC.", "");
                    ("PK_" + newTableName + "_ID");

//                for (String keyName : keyNames) {
//                    ("_"+keyName);
//                }
                    (" PRIMARY KEY(");
                    for (String keyName : keyNames) {
                        (keyName + ",");
                    }
                    (() - 1);
                    (")");
//                    (());
                } catch (Exception ex) {
                    ();
                    ("表" + tableName + "创建主键失败:" + ());
                }
            }
        }
        return keyNames;
    }

 

    /**
     * 验证指定表是否存在
     *
     * @param tableName
     * @param tableS
     * @return
     */
    public boolean isSyncTable(String tableName, String tableS) {
        if ((tableS)) {
            String[] tabls = (",");
            for (String tabName : tabls) {
                if ((tableName)) {
                    return true;
                }
            }
        } else {
            return true;
        }
        return false;
    }

    class SyncDatabase implements Runnable {

        private String selectSql;
        //来原 数据源
        private JdbcTemplate jdbcTemplateYuan;
        //保存数据源
        private JdbcTemplate jdbcTemplateSave;

        private List<String> keyNames;

        private String tableName;

        private boolean isInsert;

        private String dbType;

        private HandleMessage handle = new HandleMessage();

        public SyncDatabase(String selectSql, JdbcTemplate jdbcTemplateYuan, JdbcTemplate jdbcTemplateSave, List<String> keyNames, String tableName, boolean isInsert, String dbType) {
             = selectSql;
             = jdbcTemplateYuan;
             = jdbcTemplateSave;
             = keyNames;
             = tableName;
             = isInsert;
             = dbType;
        }

        @Override
        public void run() {
            try {
                ("开始任务:" + selectSql);
                List<Map<String, Object>> map = (selectSql);
                ("开始操作:" + () + ",主键:" + ());
                JSONArray updateList = new JSONArray();
                JSONArray insertList = new JSONArray();
                //只做对比插入
//                isInsertList(keyNames, map, tableName, jdbcTemplateSave, handle, dbType);
                isInsert = true;
                //只插入数据
                if (isInsert) {
                    for (int i = 0; i < (); i++) {
                        ((i));
                    }
                } else {
                    //更新和插入数据
                    updateInsertList(keyNames,map,tableName,jdbcTemplateSave,handle,dbType);
                }
                try {
                    (jdbcTemplateSave, insertList, tableName, dbType);
                    ("同步任务执行insert,执行表名:{},共插入{}条数据", tableName, ());
                }catch (Exception e){
                    ();
                    ("------------------------插入出错,执行更新方法--------------------------------");
                    isInsertList(keyNames,map,tableName,jdbcTemplateSave,handle,dbType);
//                    updateInsertList(keyNames,map,tableName,jdbcTemplateSave,handle,dbType);
                }
            } catch (Exception ex) {
                ();
            }finally {
                try {
                    (().getConnection(), ());
                    (().getConnection(), ());
                } catch (SQLException throwables) {
                    ();
                }
            }
            ("------------------------执行完成--------------------------------");
        }


        /**
         * @param keyNames         条件数据
         * @param map              源数据集合
         * @param tableName        更新表名称
         * @param jdbcTemplateSave 执行数据源
         * @param handle           操作对象
         * @param dbType           更新数据库类型
         * @throws Exception
         */
        public void updateInsertList(List<String> keyNames, List<Map<String, Object>> map, String tableName, JdbcTemplate jdbcTemplateSave, HandleMessage handle, String dbType) throws Exception {
            if (() > 0) {
                JSONArray updateList = new JSONArray();
                JSONArray insertList = new JSONArray();

                //end 对比插入
                //查询需要更新的数据
                for (int i = 0; i < (); i++) {
                    StringBuilder sbSelect = new StringBuilder("SELECT count(1) FROM ");
                    (tableName);
                    (" WHERE ");
                    String whereSql = "";
                    for (String key : keyNames) {
                        if ((whereSql)) {
                            whereSql += " and ";
                        }
                        whereSql += key + " = '" + (i).get(key) + "'";
                    }
                    (whereSql);
                    Integer count = ((), );
                    //                (sbSelect + "是否有数据:"+count);
                    if (count > 0) {
                        ((i));
                    } else {
                        //插入
                        ((i));
                    }
                }
                if (() > 0) {
                    (jdbcTemplateSave, updateList, tableName, keyNames, dbType);
                    ("同步任务执行update,执行表名:{},共更新{}条数据", tableName, ());
                }

                (jdbcTemplateSave, insertList, tableName, dbType);
                ("同步任务执行insert,执行表名:{},共插入{}条数据", tableName, ());
            }
        }


        /**
         * 获取插入数据源没有的数据进行插入(不更新)
         *
         * @param keyNames
         * @param map
         * @param tableName
         * @param jdbcTemplateSave
         * @param handle
         * @param dbType
         */
        public void isInsertList(List<String> keyNames, List<Map<String, Object>> map, String tableName, JdbcTemplate jdbcTemplateSave, HandleMessage handle, String dbType) throws Exception {
            //根据dwnm查询数据后,判断是否存在,如果不存在,直接插入
            JSONArray insertList = new JSONArray();
            StringBuilder sbSelectList = new StringBuilder("SELECT  " + (0) + "  FROM " + tableName + " where ");
            ((0) + " in (");
            List<String> list1 = ();

            for (int i = 0; i < (); i++) {
                ((i).get((0)).toString());
            }

            for (String str : list1) {
                ("'" + str + "',");
            }
            (() - 1);
            (" )");

            List<String> list2 = ((), );

            // 差集 (list2 - list1)
            List<String> reduce2 = ().filter(item -> !(item)).collect(());
//            ("---差集 reduce2 (list2 - list1)---");
//            ().forEach( :: println);

            if (() > 0) {
                String dwnmStr = "";
                for (int i = 0; i < (); i++) {
//                    String dwnmValue = (i).get((0)).toString();
//                    //去重
//                    if((dwnmValue) >-1){
//                        continue;
//                    }
//                    Optional<String> optional = ().filter(s->(dwnmValue)).findFirst();
//                    if(()) {
                    StringBuilder sbSelect = new StringBuilder("SELECT count(1) FROM ");
                    (tableName);
                    (" WHERE ");
                    String whereSql = "";
                    for (String key : keyNames) {
                        if ((whereSql)) {
                            whereSql += " and ";
                        }
                        whereSql += key + " = '" + (i).get(key) + "'";
                    }
                    (whereSql);
                    Integer count = ((), );
                    if (count == 0) {
                        ((i));
                    }
//                        dwnmStr += dwnmValue;
//                    }
                }
                if (() > 0) {
                    (jdbcTemplateSave, insertList, tableName, dbType);
                    ("同步任务执行只insert,执行表名:{},共插入{}条数据", tableName, ());
                }
            }
        }
    }





}