java 集成kettle

时间:2024-03-31 14:09:58
java集成kettle 示例:

方案:
启用定时任务,调用PDI生成的trans.ktr文件

<repository>
    <id>pentaho-releases</id>
    <url>http://repository.pentaho.org/artifactory/repo/</url>
  </repository>

  
<kettle.version>6.1.0.1-196</kettle.version>


        <dependency> 
            <groupId>pentaho-kettle</groupId> 
            <artifactId>kettle-core</artifactId> 
            <version>${kettle.version}</version>  
        </dependency> 
         <dependency> 
             <groupId>com.verhas</groupId> 
             <artifactId>license3j</artifactId> 
             <version>1.0.7 </version> 
         </dependency> 
        <dependency> 
            <groupId>pentaho-kettle</groupId> 
            <artifactId>kettle-dbdialog</artifactId> 
            <version>${kettle.version}</version>  
        </dependency> 
        <dependency> 
            <groupId>pentaho-kettle</groupId> 
            <artifactId>kettle-engine</artifactId> 
            <version>${kettle.version}</version>  
        </dependency> 
        <dependency> 
            <groupId>pentaho</groupId> 
            <artifactId>metastore</artifactId> 
            <version>${kettle.version}</version> 
        </dependency>
        <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>19.0</version>
           </dependency>
     </dependencies>


DEMO:


java 集成kettle
211-whicoDB-in_student(table) 插入更新到  localhost-whicoDB-in_student(table) 的trans文件

import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
/**
 * kettle version 6.1.0.1-196
 * @author jiangnan
 *
 */
public class KettleUtil {
     
     /**
      * 调用trans文件
      * @param transFileName
      * @throws Exception
      */
     public static void callNativeTrans(String transFileName) throws Exception{
           callNativeTransWithParams(null, transFileName);
     }
     
     /**
      * 调用trans文件 带参数的
      * @param params
      * @param transFileName
      * @throws Exception
      */
     public static void callNativeTransWithParams(String[] params ,String transFileName) throws Exception{
           // 初始化 
           KettleEnvironment.init();
         EnvUtil.environmentInit(); 
         TransMeta transMeta = new TransMeta(transFileName);
         //转换
         Trans trans = new Trans(transMeta); 
         //执行
         trans.execute(params);
         //等待结束
         trans.waitUntilFinished();
         //抛出异常 
        if(trans.getErrors() > 0){ 
            throw new Exception("There are errors during transformation exception!(传输过程中发生异常)"); 
        } 
     }
     
     /**
      * 调用job文件
      * @param jobName
      * @throws Exception
      */
     public static void callNativeJob(String jobName) throws Exception{
           // 初始化 
           KettleEnvironment.init();
        
           JobMeta jobMeta = new JobMeta(jobName,null);
           Job job = new Job(null, jobMeta);  
           //向Job 脚本传递参数,脚本中获取参数值:${参数名}  
           //job.setVariable(paraname, paravalue);  
            job.start();  
            job.waitUntilFinished();  
            if (job.getErrors() > 0) {
            throw new Exception("There are errors during job exception!(执行job发生异常)"); 
            }  
     }




import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.stech.business.util.KettleUtil;

public class kettleTaskJob {
     private static Logger LOGGER = LogManager.getLogger(kettleTaskJob.class.getName());

      public void run() throws Exception {
          LOGGER.info("*****kettle定时任务运行开始******");
          String transFileName = "D:/work/kettle/trans.ktr";
          KettleUtil.callNativeTrans(transFileName);
          LOGGER.info("*****kettle定时任务运行结束******");
      }
     
      public static void main(String[] args) throws Exception {
            kettleTaskJob job = new kettleTaskJob();
            job.run();
     }
}


2016-11-15 11:47:33,856 INFO  (VfsLog.java:136) - Using "C:\Users\jiangnan\AppData\Local\Temp\vfs_cache" as temporary files store.
2016/11/15 11:47:34 - trans - 为了转换解除补丁开始  [trans]
2016/11/15 11:47:37 - 表输入.0 - Finished reading query, closing connection.
2016/11/15 11:47:37 - 表输入.0 - 完成处理 (I=47, O=0, R=0, W=47, U=0, E=0)
2016/11/15 11:47:38 - 插入 / 更新.0 - 完成处理 (I=47, O=47, R=47, W=47, U=0, E=0)


异常问题解决:

2016-11-15 13:30:41,438 INFO  (VfsLog.java:136) - Using "C:\Users\jiangnan\AppData\Local\Temp\vfs_cache" as temporary files store.
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Suppliers.supplierFunction()Lcom/google/common/base/Function;
     at org.pentaho.di.core.extension.ExtensionPointMap.get(ExtensionPointMap.java:120)
     at org.pentaho.di.core.extension.ExtensionPointHandler.callExtensionPoint(ExtensionPointHandler.java:45)
     at org.pentaho.di.trans.TransMeta.loadXML(TransMeta.java:3368)
     at org.pentaho.di.trans.TransMeta.<init>(TransMeta.java:2732)
     at org.pentaho.di.trans.TransMeta.<init>(TransMeta.java:2684)
     at org.pentaho.di.trans.TransMeta.<init>(TransMeta.java:2661)
     at org.pentaho.di.trans.TransMeta.<init>(TransMeta.java:2641)
     at org.pentaho.di.trans.TransMeta.<init>(TransMeta.java:2606)
     at org.pentaho.di.trans.TransMeta.<init>(TransMeta.java:2569)
     at com.stech.business.util.KettleUtil.callNativeTrans(KettleUtil.java:21)
     at com.stech.business.quartz.kettleTaskJob.run(kettleTaskJob.java:13)
     at com.stech.business.quartz.kettleTaskJob.main(kettleTaskJob.java:19)


项目中依赖传递排除:
java 集成kettle

0
 
0