KETTLE可以实现的数据抽量抽取方式:
- 基于时间戳字段
- 在目标表中必须有时间戳字段(递增的形式) ,根据时间戳来判断是否需要更新,侵入性强。
- 基于标识字段
- 在目标表中必须有一个字段来记录更新状态,侵入性强
- 基于PK的缓存过滤
- 用一张表记录主键和状态来决定是否更新,侵入性强
- 基于CDC
- 基于物化视图
- 基于触发器临时表
- 基于两表比对
KETTLE整库迁移方案:
- 主要流程如下:
整套流程分为:2个(作业)job,4个(转换)trans。
总作业JOB视图:
-
抽取表JOB视图:
- 使用到的Trans插件:检查表是否存在、表输入、表输出、获取表名、字段选择、过滤记录、复制记录到结果、发送邮件、从结果获取记录,设置变量、JAVA代码
-
其中需要注意的是抽取JOB中一定要勾选对每个输入行都执行一次,因为整库的表不止一个所以要勾上这个让它使用每条记录来进行作业:
-
在获取表名转换中是获取表名->字段选择(获取表名也可以获取模式,我这里只要表名字段)->过滤记录(过滤掉你不需要迁移的表)->复制记录到结果(提供给后面的转换使用):
-
接下来就是判断是否存在表,是则跳过创建表结构不是就创建表结构,表结构的转换如下:
-
表输入内容如下(注意一定要能获取值,建表的SQL从这里获取进行转换):
JAVA代码如下:
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException{
// First, get a row from the default input hop
//
Object[] r = getRow();
//本地连接
org.pentaho.di.core.database.DatabaseMeta dbmeta = getTransMeta().findDatabase("Mysql_hanxiu");
if(dbmeta!=null)
{
org.pentaho.di.core.database.Database db=new org.pentaho.di.core.database.Database(dbmeta);
try
{
db.connect();
String tablename = getVariable("TABLENAME");
logBasic("开始创建表:" + tablename);
if(tablename!=null && tablename.trim().length()>0)
{
String sql = db.getDDLCreationTable(tablename, data.inputRowMeta);//${TABLENAME}
db.execStatement(sql.replace(";", ""));
logBasic(sql);
}
}
catch(Exception e)
{
logError("创建表出现异常",e);
}finally{
db.disconnect();
}
}
return false;
}
- 这里的数据库链接是从资源库获取的,如需获取本地须改为DatabaseMeta对象内容为
//本地连接获取数据库元数据
//org.pentaho.di.core.database.DatabaseMeta dbmeta = getTransMeta().findDatabase("target");
-
后面的表抽取的流程是:表输入->表输出
这样子整个JOB就走完了,如果中途出错就发邮件停止,这个是SQL server迁移到Mysql数据库的,整库迁移操作,自动建表结构(可以Oracle、Mysql、SQL Server之间互相转换),只要有两个保存好的本地数据库链接或者资源库中的也可以。需要主要的是表输出的记录数必须最少有一条。
KETTLE调度方式:
调度方式可以采用计划任务方式,但是当作业较多的时候,之间执行顺序,相互依赖关系,虽然所有作业都可以写到一个总的作业上,但是这样就把可以模块化的作业塞到一起,作业之间高度耦合,还会可能出现内存溢出的问题,为了解决问题我正在学习JenKins调度平台。
用JenKins可以解决集中管理作业/转换的调度,以及每次调度的日志保存,处理作业调度依赖性问题等。
Jenkins是基于Java开发的一种持续集成工具,用于监控持续重复的工作,功能包括:
- 持续的软件版本发布/测试项目。
- 监控外部调用执行的工作。