软件 如图(在实际操作中使用多线程技术 千万级别 数个g的文件 半个钟头以内导出导入是没问题的,当然可以进一步优化,但是目前基本可以满足大部分要求)
主要还是 通过java 生成sh脚本 去调用 sqluldr2导出 sqldr导入而已 很简单 这里只是简单的程序 说明下
1.我们定义一个接口 导出 导入
public interface AllBaseDataHandle {
/**
* 导出操作
*
* @param expTaskDto
* @param isUpload
* @param config
* @return
*/
public ExpTaskResultVo beginExp(ExpTaskDto expTaskDto,String destFileName, boolean isUpload, String configJson);
/**
* 导入操作
*
* @param impTaskDto
* @param waitFlag
* @return
*/
public Integer beginImp(ImpTaskDto impTaskDto, boolean waitFlag);
}
2.问题的核心 就是java生成 sqluldr2.sh 文件 ctl控制文件 sqlldr文件
这里给出相关代码
a.sqluldr2
/**
* 生成导出SQL
* @return
*/
protected Map<String,String> getExpSQL(String directory) {
String[] tables = allBaseDataDBVo.getBaseTable();
Map<String,String> sqls = new HashMap<String,String>();
//shell脚本存放地址
String shellDir = this.getDirectory(UUID.randomUUID().toString());
logger.info("shell 脚本存放地址:{}", shellDir);
for(String table : tables){
StringBuffer cmdStr = new StringBuffer("sqluldr2 ");
cmdStr.append("user=\"").append(allBaseDataDBVo.getExpUser()).append("/").append(allBaseDataDBVo.getExpPwd()).append("@").append(allBaseDataDBVo.getExpIp()).append("\" ");
cmdStr.append("query=\"select t.* from ").append(table).append(" t\" ");
cmdStr.append("field=\"").append(allBaseDataDBVo.getSeparate()).append("\" ");//数据分割符
cmdStr.append("record=0x0a ");
//按记录数切分文件的功能取决于三个命令行选项: FILE, ROWS, BATCH
cmdStr.append("file=\"").append(directory).append(table).append("-%b.txt\" ");
cmdStr.append("rows=100000 ");
cmdStr.append("batch=yes ");
//用户生成ctl文件
/*cmdStr.append("table=\"").append(table).append("\" ");
cmdStr.append("control=\"").append(directory).append(table).append("_sqlldr.ctl\" ");*/
cmdStr.append("safe=yes head=yes ");
cmdStr.append("charset=UTF8 ");
cmdStr.append("array=100 ");
cmdStr.append("read=218 ");
cmdStr.append("serial=0 ");
String sql = cmdStr.toString();
logger.info("cmdStr: {}", sql);
String path = shellDir + table + ".sh";
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new FileWriter(path, false));
bw.write(sql);
bw.flush();
IOUtils.closeQuietly(bw);
} catch (IOException e) {
logger.error("生成导出SQL异常:", e);
} finally {
IOUtils.closeQuietly(bw);
}
String javaCmd = "bash " + path;
logger.info("sqluldr2.sh脚本地址:: {}, java程序执行的命令:{}", path, javaCmd);
if (isWindows()) {//如果当前操作系统为windows
sqls.put(table, sql);
}else{
sqls.put(table, javaCmd);
}
}
return sqls.size() > 0 ? sqls : null;
}
b.ctl控制文件(主要是根据sqluldr2导出的txt文件生成,同时处理好相应的time字段)
/**
* 解析数据文件 目的:
* 1.获取数据库字段Filed
* 2.对每一行数据中的‘换行符’进行过滤
* 3.对Fileds字段中的‘时间类型’进行过滤
* 4.生成对应的ctl文件
* @param path
* @param fileName
* @return
*/
private String parseFile(String path, String fileName){
logger.info("分析文件:{},开始。。。。。。。。。。。。。。。。", path+fileName);
String separate = allBaseDataDBVo.getSeparate();
//String separate = ",$";
logger.info("解析文件分割符:{}", separate);
File file = new File(path + fileName);
if(null == file || !file.isFile() || !file.exists()){
logger.error("需要解析的文件不存在!");
return null;
}
String fileds = ""; //数据库字段Filed
int count = 0;//分割符的数量
File tmpFile = new File(path + fileName + ".tmp");
List<String> lines = new ArrayList<String>();
FileInputStream inputStream = null;
InputStreamReader inputStreamReader = null;
BufferedReader reader = null;
try {
inputStream = new FileInputStream(file);
inputStreamReader = new InputStreamReader(inputStream, CHARSET);
reader = new BufferedReader(inputStreamReader);
String tempString = null;
String line = "";
int tmpCount = 0;
//1.获取数据库字段Filed
if ((tempString = reader.readLine()) != null) {
fileds = tempString;
}
count = divCount(fileds, separate);
if(count == 0){
logger.error("该文件:{},解析异常,不能正常解析获取数据库字段Filed", file.getAbsolutePath());
return null;
}
lines.add(fileds+SEPARATOR);
logger.info("分析文件:获取数据库字段Filed:{}", fileds);
//2.对每一行数据中的‘换行符’进行过滤 .一次读入一行,直到读入null为文件结束
while ((tempString = reader.readLine()) != null) {
int currentDivCount = divCount(tempString, separate);
currentDivCount += tmpCount;
if ( count == currentDivCount ) {
line = line + tempString + SEPARATOR;
lines.add(line);
line = "";
tmpCount = 0;
} else {
tmpCount = currentDivCount;
tempString = tempString.replace(System.getProperty("line.separator"), "");
line = line + tempString;
}
}
writeDataToFile(lines, tmpFile);
FileUtils.copyFile(tmpFile, file, true);
if (tmpFile.exists()) {
tmpFile.delete();
}
//tmpFile.renameTo(file);
} catch (Exception e) {
logger.error("", e);
return null;
} finally {
IOUtils.closeQuietly(reader);
IOUtils.closeQuietly(inputStreamReader);
}
logger.info("对每一行数据中的‘换行符’进行过滤成功!");
// 3.对Fileds字段中的时间类型 进行过滤
char[] st = separate.toCharArray();
StringBuffer bBuffer = new StringBuffer();
for(char temp : st){
bBuffer.append("\\").append(temp);
}
String[] FileldArray = fileds.split(bBuffer.toString());
StringBuffer bf = new StringBuffer();
for(String filed : FileldArray){
boolean bool = filed.toUpperCase().contains("_TIME");
filed = bool ? filed + " TIMESTAMP \"YYYY-MM-DD HH24:MI:SSXFF\"" : filed;
bf.append(filed).append(",");
}
fileds = bf.toString().substring(0, bf.length()-1);
logger.info("对Fileds字段中的‘时间类型’进行过滤:{}", fileds);
//4.生成对应的ctl文件
String tableName = fileName.split("\\-")[0];
String ctlFileName = path + fileName + ".ctl";
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new FileWriter(ctlFileName , false));
bw.write("load data\r\n ");
bw.write("characterset AL32UTF8\r\n ");//设置导入的编码
bw.write("infile '" + file.getAbsolutePath()+"'\r\n ");
//bw.write("truncate\r\n "); //删除旧记录,替换成新装载的记录
bw.write("append\r\n "); //在表中追加新记录
bw.write("into table "+tableName+"\r\n ");
bw.write("fields terminated by '"+separate+"'\r\n ");
bw.write("trailing nullcols\r\n ");
bw.write("(" + fileds + ")\r\n ");
bw.flush();
} catch (Exception e) {
logger.error("", e);
}finally {
IOUtils.closeQuietly(bw);
}
logger.info("该数据{},对应的ctl文件:{}",file.getAbsolutePath(), ctlFileName);
return new File(ctlFileName).getName();
}
c.sqlldr文件
private void importDataToDB(String filePath, String fileName) throws Exception{
logger.info(String.format("ALL_BASE_DATA-基础资料包全量导入,导入数据库信息,用户名:%s, 密码:%s, ip:%s", allBaseDataDBVo.getImpUser(),allBaseDataDBVo.getImpPwd(),allBaseDataDBVo.getImpIp()));
StringBuffer sb = new StringBuffer("sqlldr ");
sb.append("userid=").append(allBaseDataDBVo.getImpUser()).append("/").append(allBaseDataDBVo.getImpPwd()).append("@").append(allBaseDataDBVo.getImpIp()).append(" ");
sb.append("skip=1").append(" ");//忽略第一行
sb.append("control=").append(filePath+fileName).append(" ");
sb.append("log=").append(filePath+fileName).append(".log ");
sb.append("bad=").append(filePath+fileName).append(".bad ");
sb.append("readsize=21971520 ");
sb.append("streamsize=21971520 ");
sb.append("date_cache=21971520 ");
sb.append("columnarrayrows=760000 ");
sb.append("bindsize=5000 ");
sb.append("direct=Y ");
String dos = sb.toString();
logger.info("ALL_BASE_DATA-sqlldr命令L:{},基础资料全量导入", dos);
Process process = null;
try {
process = Runtime.getRuntime().exec(dos);
InputStream ins = process.getInputStream(); // 获取执行cmd命令后的信息;
BufferedReader reader = new BufferedReader(new InputStreamReader(ins));
String line = null;
while ((line = reader.readLine()) != null){
String msg = new String(line.getBytes("ISO-8859-1"), CHARSET);
logger.info(msg);// 输出
}
int exitValue = process.waitFor();
logger.info(exitValue==0 ? "返回值:{},数据导入成功!" : "返回值:{},数据导入失败!", exitValue);
} catch (Exception e) {
logger.error("执行sqlldr脚本异常:\r\n{}",e);
} finally {
process.getOutputStream().close(); // 关闭
}
}
其中allBaseDataDBVo类 只是导出导入数据库的用户名密码 ip之类的pojo
2.当然以上只是给出相应的文件生成,有了这些 ,在java层面 我们可以使用并发库等技术 生成导入导出等等
具体细节不在表述