自定义步骤插件需要实现以下四个接口(不指明都在org.pentaho.di.trans.step包下)
StepMetaInterface 默认实现类BaseStepMeta
说明:维护、校验、序列化配置,提供访问步骤类的入口,影响行布局变化
StepDialogInterface 默认实现类org.pentaho.di.ui.trans.step.BaseStepDialog
说明:配置界面
StepInterface
默认实现类BaseStep
说明:处理行流
StepDataInterface 默认实现类BaseStepData
说明:保存执行状态,作为执行中一个声明和使用变量的地方
在kettle插件中使用图片
首先icon
需SVG 1.1版格式。
然后图片应放在 项目/src 目录下。运行时从jar包文件中加载。图片文件位置信息由你插件中的JobMeta 或 StepMeta指明,通常用一个java注解来实现,例如:
如果需要在步骤UI对象框中使用,可以通过类似以下代码来设置
维护步骤配置(StepMetaInterface
)
StepMetaInterface
的实现类使用私有属性来追中步骤配置,并提供对应get() set()方法。StepDialogInterface 的实现类使用这些getter setter方法来操作用户输入的配置。
void setDefault()方法在每次新步骤创建时,被PDI客户端(Spoon)用来设置重要的默认值。是设置非null属性的绝佳之地,而步骤中大多数属性必须非null。
public Object clone()
方法在创建多个相同步骤时,被调用返回一个深度拷贝的meta对象。
插件将配置数据序列化到xml和PDI资源库
String getXML()
方法在客户端保存转换时被调用,返回一串包含配置数据的xml字符串。一个tag一条配置,org.pentaho.di.core.xml.XMLHandler构建。
void loadXML()
方法在步骤从xml读取配置时被调用,配置数据同样由org.pentaho.di.core.xml.XMLHandler
来读取。
void saveRep()
方法在步骤配置被保存到资源库时被调用。
void readRep()
方法在从资源库读取步骤配置时被调用。
StepMetaInterface
实现类是每个插件的主类,定义了整个步骤的架构。提供下面几个方法来获取其他三个附属组件:
StepDialogInterface getDialog() 返回步骤配置对话框对象。
StepInterface getStep() 返回步骤处理对象。
StepDataInterface getStepData() 返回步骤数据对象。
获知行流的结构信息
void getFields()
方法得到输入行流结构的描述信息,步骤插件修改它以匹配自身输出的字段结构。
校验步骤配置
PDI
客户端支持校验转换的特性,在画布上通过调用每个步骤的check()方法来检查所有步骤的配置。
StepMetaInterface
实现类必须使用Step注解,其属性有:
id
全局唯一
image icon
图片的资源位置
name
步骤标识
description
步骤描述
categoryDescription
步骤应该属于步骤列表的哪个目录。例如输入、输出、转换等
i18nPackageName
提供i18n属性文件的包路径
配置对话框(StepDialogInterface
)
使用swt框架来实现对话框,当你使用客户端打开步骤配置界面时,系统通过传入一个StepMetaInterface对象来实例化dialog对象,并调用其open()方法。
String open()
方法在对话框确认或取消是返回,且必须遵循以下规则:
如果确认:
StepMetaInterface
对象必须更新为最新的配置。如果改动了任何配置,StepMetaInterface对象的Changed标志必须设为true
。open()
方法返回步骤的name。
如果取消:
StepMetaInterface
对象不可改变。StepMetaInterface
对象的Changed标志被设置成原状态
。open()
方法必须返回null。
StepMetaInterface
对象内建的Changed标志可以通过hasChanged()和setChanged()方法来访问。
行数据处理(StepInterface)
StepInterface
实现类有三个重要的方法需要实现,对应步骤的三个生命周期。
init()
初始化步骤时被调用一次。当所有步骤都进行了初始化以后,开始重复调用processRow(),直到收到接收所有行处理完的信号为止。当处理完成后,调用dispose()。
三个方法都传入一个StepMetaInterface和一个StepDataInterface对象,这个两个对象都可以安全的强转为具体实现类对象。
StepInterface
实现类不能声明任何属性,所有属性都用在StepDataInterface对象中。目的是解耦不同StepInterface对象的属性,便于PDI使用不同线程模型来执行转换。
boolean init()
方法在开始执行前被调用。每个步骤有且仅执行一次,比如打开文件、获取数据连接。对于任何继承于BaseStep的实现,必须强制调用super.init()来保证可预期的行为。成功时返回true,失败时返回false。只要任意一个步骤返回false,PDI退出执行转换。
一旦转换开始执行,即可进入一个循环,不断调用每个步骤的processRow(),直到步骤返回false。大多数情况下,每一个步骤读取一行,改变行结构和列,并传递到下一个步骤。而输入、分组、排序等步骤读取一批或缓存一批数据处理后才传递结果到下一个步骤。
在boolean processRow()方法中调用getRow()获取输入行,该方法是个阻塞方法,直到获取一个行对象或null(没有数据)。调用putRow()来传递行到下一个步骤。处理行必须遵循以下规则:
如果没有数据可以处理了,就调用setOutputDone() 并且返回false。如果没有处理完,就一直返回true
BaseStep
类提供了
First标志来帮助实现第一次条用processRow()
。
void dispose()
方法在转换执行结束后被调用,用于释放init()中分配的资源。任何继承与BaseStep的实现类必须强制调用super.dispose()来保证预期的行为。
处理状态保存(StepDataInterface
)
前面已经讲过处理过程中,数据状态的保存。
行操作(RowMetaInterface
)
行流在PDI中以Object[]形式存在,每一列按下标递增的方式依次排列。
PDI
使用内部实现RowMetaInterface的对象来描述和修改行结构。在processRow()方法中可以调用由BaseStep默认提供的getInputRowMeta()来获取行结构。
同样类似PDI提供了内部实现ValueMetaInterface的对象来描述和修改列结构。
第一次执行时为了获取相关行列的信息,可使用RowMetaInterface的以下方法:
indexOfValue(String valueName)
通过列名获取列的行索引
getFieldNames()
获取列名数组,跟行对象数组一一对应
searchValueMeta(String valueName)
根据列名获取列元数据
getValueMeta(int index)
根据索引获取列元数据
getValueMetaList()
获取列元数据数组,其索引与行对象数组一一对应
更多的RowMetaInterface操作请参考:
http://javadoc.pentaho.com/kettle530/kettle-core-5.3.0.0-javadoc/org/pentaho/di/core/row/RowMetaInterface.html
如果希望获取行的一个副本,可调用RowMetaInterface 的cloneRow()
如果希望增加或减少一列,可调用RowDataUtil工具类的静态方法。如调用其resizeArray()来增加一列
如果希望重新构造一行,可调用RowDataUtil的allocateRowData()
更多的RowDataUtil操作请参考:
http://javadoc.pentaho.com/kettle530/kettle-core-5.3.0.0-javadoc/org/pentaho/di/core/row/RowDataUtil.html
PDI
数据类型
ValueMetaInterface
提供一些安全的方法来获取具体的java值
getString()
getInteger()
getNumber()
getBigNumber()
getDate()
getBoolean()
getBinary()
其他数据操作请参考:
http://javadoc.pentaho.com/kettle/org/pentaho/di/core/row/ValueMetaInterface.html
错误处理
若出现错误,可以打印错误日志,也可以通过调用setErrors(1),stopAll(),setOutputDone()或在processRow()返回false或抛出一个KettleException来停止转换。
若你需要将错误行传递到特定的处理步骤,重写BaseStepMeta的
supportsErrorHandling()
只返回true,将允许你在客户端UI上建立错误传输连接。运行时通过putError()传递错误数据。
行计数器
显示处理进度使用,对应界面上这些数值:
linesRead
读。由getRow()调用。
linesWritten
写。由 putRow()调用。
linesInput
输入。调用incrementLinesInput()。
linesOutput
输出。调用incrementLinesOutput()。
linesUpdated
更新。调用incrementLinesUpdated()。
linesSkipped
跳过。调用incrementLinesSkipped()。
linesRejected
拒绝。由 putError()调用。
日志
void logMinimal()
void logBasic()
void logDetailed()
void logDebug()
void logRowlevel()
void logError()
部署步骤插件:
打一个jar包。
创建一个目录,取一个好名,把jar包放进去。
把上步创建的插件目录,放到data-integration/plugins/steps下