sqluldr2全量数据库导入导出和java-java语言介入(3)

时间:2024-04-07 11:36:50
我们为什么要用java呢,当然还是因为java熟悉呗,毕竟程序用的是java,linux sh不是每个人都可以登录去玩的,所以我们用java做了 一套管理导入导出软件

软件 如图(在实际操作中使用多线程技术 千万级别 数个g的文件 半个钟头以内导出导入是没问题的,当然可以进一步优化,但是目前基本可以满足大部分要求)

sqluldr2全量数据库导入导出和java-java语言介入(3)

sqluldr2全量数据库导入导出和java-java语言介入(3)

主要还是 通过java 生成sh脚本 去调用 sqluldr2导出 sqldr导入而已 很简单  这里只是简单的程序 说明下



1.我们定义一个接口 导出 导入


public interface AllBaseDataHandle {

/**
* 导出操作

* @param expTaskDto
* @param isUpload
* @param config
* @return
*/
public ExpTaskResultVo beginExp(ExpTaskDto expTaskDto,String destFileName, boolean isUpload, String configJson);

/**
* 导入操作

* @param impTaskDto
* @param waitFlag
* @return
*/
public Integer beginImp(ImpTaskDto impTaskDto, boolean waitFlag);

}
2.问题的核心  就是java生成 sqluldr2.sh 文件  ctl控制文件 sqlldr文件
这里给出相关代码
a.sqluldr2 
/**
     * 生成导出SQL
     * @return
     */
    protected Map<String,String> getExpSQL(String directory) {
    String[] tables = allBaseDataDBVo.getBaseTable();
   
    Map<String,String> sqls = new HashMap<String,String>();
    //shell脚本存放地址
    String shellDir = this.getDirectory(UUID.randomUUID().toString());
    logger.info("shell 脚本存放地址:{}", shellDir);
    for(String table : tables){
    StringBuffer cmdStr = new StringBuffer("sqluldr2 ");
    cmdStr.append("user=\"").append(allBaseDataDBVo.getExpUser()).append("/").append(allBaseDataDBVo.getExpPwd()).append("@").append(allBaseDataDBVo.getExpIp()).append("\" ");
    cmdStr.append("query=\"select t.* from ").append(table).append(" t\" ");
    cmdStr.append("field=\"").append(allBaseDataDBVo.getSeparate()).append("\" ");//数据分割符
    cmdStr.append("record=0x0a ");
    //按记录数切分文件的功能取决于三个命令行选项: FILE, ROWS, BATCH
    cmdStr.append("file=\"").append(directory).append(table).append("-%b.txt\" ");
    cmdStr.append("rows=100000 ");
    cmdStr.append("batch=yes ");
    //用户生成ctl文件
    /*cmdStr.append("table=\"").append(table).append("\" ");
    cmdStr.append("control=\"").append(directory).append(table).append("_sqlldr.ctl\" ");*/
    cmdStr.append("safe=yes head=yes ");
    cmdStr.append("charset=UTF8 ");
    cmdStr.append("array=100 ");
    cmdStr.append("read=218 ");
    cmdStr.append("serial=0 ");
    String sql = cmdStr.toString();
    logger.info("cmdStr: {}", sql);
   
    String path  = shellDir + table + ".sh";
    BufferedWriter bw = null;
    try {
bw = new BufferedWriter(new FileWriter(path, false));
bw.write(sql);
bw.flush();
IOUtils.closeQuietly(bw);
} catch (IOException e) {
logger.error("生成导出SQL异常:", e);
} finally {
IOUtils.closeQuietly(bw);
}
   
    String javaCmd = "bash " + path;
    logger.info("sqluldr2.sh脚本地址:: {}, java程序执行的命令:{}", path, javaCmd);
   
    if (isWindows()) {//如果当前操作系统为windows
    sqls.put(table, sql);
    }else{
    sqls.put(table, javaCmd);
    }
    }
   
    return sqls.size() > 0 ? sqls : null;
    }
b.ctl控制文件(主要是根据sqluldr2导出的txt文件生成,同时处理好相应的time字段)
/**
* 解析数据文件 目的:
* 1.获取数据库字段Filed
* 2.对每一行数据中的‘换行符’进行过滤
* 3.对Fileds字段中的‘时间类型’进行过滤
* 4.生成对应的ctl文件
* @param path
* @param fileName
* @return
*/
private String parseFile(String path, String fileName){
logger.info("分析文件:{},开始。。。。。。。。。。。。。。。。", path+fileName);
String separate = allBaseDataDBVo.getSeparate();
//String separate = ",$";
logger.info("解析文件分割符:{}", separate);

File file = new File(path + fileName);
if(null == file || !file.isFile() || !file.exists()){
        logger.error("需要解析的文件不存在!");
        return null;
        }
String fileds = "";     //数据库字段Filed
int count = 0;//分割符的数量
File tmpFile = new File(path + fileName + ".tmp");

List<String> lines = new ArrayList<String>();
FileInputStream inputStream = null;
InputStreamReader inputStreamReader = null;
BufferedReader reader = null;
try {
inputStream = new FileInputStream(file);
inputStreamReader = new InputStreamReader(inputStream, CHARSET);
reader = new BufferedReader(inputStreamReader);
String tempString = null;
String line = "";
int tmpCount = 0;

//1.获取数据库字段Filed
if ((tempString = reader.readLine()) != null) {
fileds = tempString;
}
count = divCount(fileds, separate);
if(count == 0){
logger.error("该文件:{},解析异常,不能正常解析获取数据库字段Filed", file.getAbsolutePath());
return null;
}
lines.add(fileds+SEPARATOR);
logger.info("分析文件:获取数据库字段Filed:{}", fileds);

//2.对每一行数据中的‘换行符’进行过滤 .一次读入一行,直到读入null为文件结束
while ((tempString = reader.readLine()) != null) {
int currentDivCount = divCount(tempString, separate);
currentDivCount += tmpCount;
if ( count == currentDivCount ) {
line = line + tempString + SEPARATOR;
lines.add(line);
line = "";
tmpCount = 0;
} else {
tmpCount = currentDivCount;
tempString = tempString.replace(System.getProperty("line.separator"), "");
line = line + tempString;
}
}
writeDataToFile(lines, tmpFile);
FileUtils.copyFile(tmpFile, file, true);
if (tmpFile.exists()) {
tmpFile.delete();
}
//tmpFile.renameTo(file);
} catch (Exception e) {
logger.error("", e);
return null;
} finally {
IOUtils.closeQuietly(reader);
IOUtils.closeQuietly(inputStreamReader);
}
logger.info("对每一行数据中的‘换行符’进行过滤成功!");

// 3.对Fileds字段中的时间类型 进行过滤
char[] st = separate.toCharArray();
StringBuffer bBuffer = new StringBuffer();
for(char temp : st){
bBuffer.append("\\").append(temp);
}
String[] FileldArray = fileds.split(bBuffer.toString());
StringBuffer bf = new StringBuffer();
for(String filed : FileldArray){
boolean bool = filed.toUpperCase().contains("_TIME");
filed = bool ? filed + " TIMESTAMP \"YYYY-MM-DD HH24:MI:SSXFF\"" : filed;
bf.append(filed).append(",");
}
fileds = bf.toString().substring(0, bf.length()-1);
logger.info("对Fileds字段中的‘时间类型’进行过滤:{}", fileds);

//4.生成对应的ctl文件
String tableName = fileName.split("\\-")[0];
String ctlFileName = path + fileName + ".ctl";
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new FileWriter(ctlFileName , false));
bw.write("load data\r\n ");
bw.write("characterset AL32UTF8\r\n ");//设置导入的编码
bw.write("infile '" + file.getAbsolutePath()+"'\r\n ");
//bw.write("truncate\r\n ");  //删除旧记录,替换成新装载的记录
bw.write("append\r\n ");  //在表中追加新记录
bw.write("into table "+tableName+"\r\n ");
bw.write("fields terminated by '"+separate+"'\r\n ");
bw.write("trailing nullcols\r\n ");
bw.write("(" + fileds + ")\r\n ");
bw.flush();
} catch (Exception e) {
logger.error("", e);
}finally {
IOUtils.closeQuietly(bw);
}
        
        logger.info("该数据{},对应的ctl文件:{}",file.getAbsolutePath(), ctlFileName);
        return new File(ctlFileName).getName();
}
c.sqlldr文件
private void importDataToDB(String filePath, String fileName) throws Exception{
logger.info(String.format("ALL_BASE_DATA-基础资料包全量导入,导入数据库信息,用户名:%s, 密码:%s, ip:%s", allBaseDataDBVo.getImpUser(),allBaseDataDBVo.getImpPwd(),allBaseDataDBVo.getImpIp()));
     
StringBuffer sb = new StringBuffer("sqlldr ");
sb.append("userid=").append(allBaseDataDBVo.getImpUser()).append("/").append(allBaseDataDBVo.getImpPwd()).append("@").append(allBaseDataDBVo.getImpIp()).append(" ");
sb.append("skip=1").append(" ");//忽略第一行
sb.append("control=").append(filePath+fileName).append(" ");
sb.append("log=").append(filePath+fileName).append(".log ");
sb.append("bad=").append(filePath+fileName).append(".bad ");
sb.append("readsize=21971520 ");
sb.append("streamsize=21971520 ");
sb.append("date_cache=21971520 ");
sb.append("columnarrayrows=760000 ");
sb.append("bindsize=5000 ");
sb.append("direct=Y ");
String dos = sb.toString();
logger.info("ALL_BASE_DATA-sqlldr命令L:{},基础资料全量导入", dos);

Process process = null;
try {
process = Runtime.getRuntime().exec(dos);
        InputStream ins = process.getInputStream(); // 获取执行cmd命令后的信息;
        BufferedReader reader = new BufferedReader(new InputStreamReader(ins));
        String line = null;
        while ((line = reader.readLine()) != null){
            String msg = new String(line.getBytes("ISO-8859-1"), CHARSET);
            logger.info(msg);// 输出
        }
        int exitValue = process.waitFor();
        logger.info(exitValue==0 ? "返回值:{},数据导入成功!" : "返回值:{},数据导入失败!", exitValue);
} catch (Exception e) {
logger.error("执行sqlldr脚本异常:\r\n{}",e);
} finally {
process.getOutputStream().close(); // 关闭
}
}

其中allBaseDataDBVo类 只是导出导入数据库的用户名密码 ip之类的pojo 


 2.当然以上只是给出相应的文件生成,有了这些 ,在java层面 我们可以使用并发库等技术 生成导入导出等等
具体细节不在表述