Java执行kettle文件
添加依赖:
<dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-engine</artifactId> <version>7.0.0.0-25</version> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-core</artifactId> <version>7.0.0.0-25</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.6</version> </dependency>
工具类
package com.example.kettledemo.util; import java.io.File; import java.util.Map; import org.pentaho.di.core.Const; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.util.EnvUtil; import org.pentaho.di.job.Job; import org.pentaho.di.job.JobMeta; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; /** * @Package: com.example.kettledemo.util * @Since: 2020/5/13 14:26 * @Version: V1.0 */ public class kettleUtil { /** * 调用trans文件 * * @param transFileName * @throws Exception */ public static void callNativeTrans(String transFileName) throws Exception { callNativeTransWithParams(null, transFileName); } /** * 调用trans文件 带参数的 * * @param params * @param transFileName * @throws Exception */ public static void callNativeTransWithParams(String[] params, String transFileName) throws Exception { // 初始化 KettleEnvironment.init(); EnvUtil.environmentInit(); TransMeta transMeta = new TransMeta(transFileName); //转换 Trans trans = new Trans(transMeta); //执行 trans.execute(params); //等待结束 trans.waitUntilFinished(); //抛出异常 if (trans.getErrors() > 0) { throw new Exception("There are errors during transformation exception!(传输过程中发生异常)"); } } /** * 调用job文件 * * @param jobName * @throws Exception */ public static void callNativeJob(String jobName, Map map) throws Exception { // 初始化 String path = Thread.currentThread().getContextClassLoader().getResource("").getPath(); File file = new File(path+"/simple-jndi");// path是jdbc.prtoperties上层文件夹路径 String sysPath = file.getCanonicalPath(); Const.JNDI_DIRECTORY = sysPath; KettleEnvironment.init(); JobMeta jobMeta = new JobMeta(jobName, null); Job job = new Job(null, jobMeta); //向Job 脚本传递变量,脚本中获取参数值:${变量名} job.injectVariables(map); job.start(); job.waitUntilFinished(); if (job.getErrors() > 0) { throw new Exception("There are errors during job exception!(执行job发生异常)"); } } } XXX.kjb中配置的变量:
测试:执行kettle任务
filePath为kettle任务文件路径,param为变量 @GetMapping("test") public void test(String filePath, String param) throws Exception { HashMap<Object, Object> map = Maps.newHashMap(); map.put("param", param); kettleUtil.callNativeJob(filePath, map); }
配置JNDI
1.将JNDI配置文件(data-integration/simple-jndi/jdbc.properties)放在一个指定的目录下,如项目根目录下simple-jndi/jdbc.properties:
项目根目录下simple-jndi/jdbc.properties:
2.添加simple-jndi的jar依赖。
<dependency> <groupId>simple-jndi</groupId> <artifactId>simple-jndi</artifactId> <version>0.11.4.1</version> </dependency>
3.调用kettle的环境初始化方法KettleEnvironment.init(true),加载JNDI
String path = Thread.currentThread().getContextClassLoader().getResource("").getPath(); File file = new File(path+"/simple-jndi");// path是jdbc.prtoperties上层文件夹路径 String sysPath = file.getCanonicalPath(); Const.JNDI_DIRECTORY = sysPath; KettleEnvironment.init();