环境:windows7,jvm内存设置14G,kettle5.1后来升级到5.4,oracle作为资源库。
问题背景:我们通过web页面管理kettle的job运行,这只是一个管理界面,即使web项目停掉也不会影响job的运行情况,实际运行job的是后台程序,随着job数量的增多,达到三四百个时,job的运行速度也达到了难以接受的程度。
方案1:
针对出现的问题,经测试发现,job一经运行就不会再重新从资源库读取了(针对定时运行的job),job中的转换则每次都会重新从资源库中读取,我找到了org.pentaho.di.job.entries.trans.JobEntryTrans这个类,这是一个作业控件,代表一个转换,调试跟踪代码就会发现他确实会在每次运行时重新读取资源库加载转换,因为kettle每次运行时都是克隆了这个控件,然后运行时重新加载,具体还是调试跟踪代码才能更清楚的了解,对于他为什么要克隆,我也不是很了解,当然也不敢乱动相关代码,估计因为里面有些状态属性吧。
解决问题的思路是尽量从底层,具体的问题点解决问题,一开始就想过优化具体读取转换的过程,但没有去尝试,而是采用了更具针对性,更简单的方式,在JobEntryTrans这个类里面建了个静态Map,用于缓存读取过的转换,这个缓存的生命周期与对应的job差不多,因为每次从数据库读取JobEntryTrans这个控件时,都会清除该控件引用的转换的缓存,若多个job都引用了这个转换,那他的生命周期比job还短。
这个方案需要改的代码量很小,读取转换时,先查看缓存,没有就读取资源库,然后缓存,下次就直接用缓存了,其次就是在JobEntryTrans类的读取资源库的方法中加上清除对应转换缓存的代码。
缓存??到底以什么形式缓存?为了尽量减小对原有逻辑的影响我先缓存了xml,就是调用了TransMeta的getXml方法,下次读取时就直接使用xml,这个方式在测试环境是没问题的,但在正式环境中始终有个转换有问题,运行异常,正式环境不方便调试,于是我就改为直接缓存TransMeta对象,每次就直接用了,这对写代码来说肯定更简单了,但这个对象里面到底有哪些东西,我没有精力去仔细分析,抱着试一试的态度实现了,经测试效果还是很不错的,缓存xml时的问题不存在了,经过一段时间的运行,没有发现什么大问题,就是记录的日志感觉有点问题,该方案现在依然在使用中,基本没有影响kettle完成它的工作。
这个代码量不大,就贴出来,就是修改了kettle5.4中TransMeta这个类的两个方法:
// Load the jobentry from repository // public void loadRep( Repository rep, IMetaStore metaStore, ObjectId id_jobentry, List<DatabaseMeta> databases, List<SlaveServer> slaveServers ) throws KettleException { try {
//.................这里的源码与官方一致。
passingAllParameters = rep.getJobEntryAttributeBoolean( id_jobentry, "pass_all_parameters", true ); if(transMetaMap.containsKey(getDirectory()+"/"+getName())){ logBasic( "该转换已经缓存,马上移除缓存:" + getDirectory()+"/"+getName() ); transMetaMap.remove(getDirectory()+"/"+getName()); } } catch ( KettleDatabaseException dbe ) { throw new KettleException( "Unable to load job entry of type 'trans' from the repository for id_jobentry=" + id_jobentry, dbe ); } } public TransMeta getTransMeta( Repository rep, IMetaStore metaStore, VariableSpace space ) throws KettleException { try { TransMeta transMeta = null; switch( specificationMethod ) { case FILENAME: long start = new Date().getTime(); String filename = space.environmentSubstitute( getFilename() ); logBasic( "Loading transformation from XML file [" + filename + "]" ); transMeta = new TransMeta( filename, metaStore, null, true, this, null ); log.logBasic(transMeta.getName()+",从文件读取转换耗时:"+(new Date().getTime()-start)); break; case REPOSITORY_BY_NAME: if(transMetaMap.containsKey(getDirectory()+"/"+getName())){ logBasic( "该转换已经缓存,直接使用缓存:" + getDirectory()+"/"+getName() ); transMeta = transMetaMap.get(getDirectory()+"/"+getName()); }else{ String transname = space.environmentSubstitute( getTransname() ); String realDirectory = space.environmentSubstitute( getDirectory() ); logBasic( BaseMessages.getString( PKG, "JobTrans.Log.LoadingTransRepDirec", transname, realDirectory ) ); if ( rep != null ) { // // It only makes sense to try to load from the repository when the // repository is also filled in. // // It reads last the last revision from the repository. // RepositoryDirectoryInterface repositoryDirectory = rep.findDirectory( realDirectory ); transMeta = rep.loadTransformation( transname, repositoryDirectory, null, true, null ); transMetaMap.put(getDirectory()+"/"+getName(), transMeta); logBasic( "从资源库获取转换并缓存:" + getDirectory()+"/"+getName() ); } else { throw new KettleException( BaseMessages.getString( PKG, "JobTrans.Exception.NoRepDefined" ) ); } } break; case REPOSITORY_BY_REFERENCE: if ( transObjectId == null ) { throw new KettleException( BaseMessages.getString( PKG, "JobTrans.Exception.ReferencedTransformationIdIsNull" ) ); } if ( rep != null ) { // Load the last revision // transMeta = rep.loadTransformation( transObjectId, null ); } break; default: throw new KettleException( "The specified object location specification method '" + specificationMethod + "' is not yet supported in this job entry." ); } if ( transMeta != null ) { // copy parent variables to this loaded variable space. // transMeta.copyVariablesFrom( this ); // Pass repository and metastore references // transMeta.setRepository( rep ); transMeta.setMetaStore( metaStore ); } return transMeta; } catch ( Exception e ) { throw new KettleException( BaseMessages.getString( PKG, "JobTrans.Exception.MetaDataLoad" ), e ); } }
方案1总结:
1.该方案解决了转换重复加载的问题,在一次加载后,进行了缓存,并给出了清除缓存重新加载的机制,实际使用效果是:第一次运行仍然很慢,但之后就是飞一般的感觉,之前没有数据每次都要运行十来分钟,现在就只需几秒钟,证明这个问题就是导致kettle运行慢的唯一原因,这里我们不完美的解决了他。
2.该方案第一次运行job仍然很慢,日志记录可能也有问题,对时效性要求高的话,可能该方案还不能完全解决问题,因为重启后台程序时,job的延迟仍然可能达到一个多小时,当然也就刚重启那一下的事,就看你的业务能不能接受了。
方案2:
方案1中重启后台程序导致的高延迟,我们的业务上仍然不能忍受,于是开始思考其他解决方案。
通过尝试解决这个问题,认为想通过改进kettle从数据库读取转换的过程来优化是行不通的,因为kettle读取资源库的过程很复杂,涉及很多逻辑(仔细看过相关代码,并测试了各个关键操作的耗时,对测试出的耗时操作进行分析后认为很难优化),另外经测试,文件资源库读取转换要比数据库资源库快近100倍,于是决定采用如下方式解决:
1.创建job等过程不做任何改变,所有操作都对数据库资源库。
2.后台job在获取一些job信息时也使用数据库资源库,但在最后一步运行job时,获取文件资源库中的job并运行,这里就要求job启动时,两个资源库相关job要相同,特别是job名称和路径,这是确认同一个job的方式,实际运行的就是文件资源库中的job了。
3.这里就涉及到文件资源库与数据库资源库的同步问题了:
1)人工手动将数据库资源库导出到指定的文件资源库。
2)页面控制job的启动与停止,只在页面请求启动job时后台自动将该job相关信息同步到文件资源库(待实现),其他地方不做任何改变,即使后台程序重启,也不用同步job,这样重启后台程序时就不会出现几十分钟job都没有运行起来的情况了。
3)当我们修改了job,确认要运行了,这时就在页面先停止job,再启动就实现同步了。
以上只是问题解决思路,具体实现也实现了个大概,资源库同步还有问题,通过以上方式实际并没有解决数据库资源库读取慢的问题,这需要对kettle有更深入的了解的技术大牛来解决。
这个解决方案达到的效果:
1.解决了后台程序重启时高延迟的问题(主要问题)。
2.间接减轻了数据库资源库的压力,这样我们创建修改job时就会快一些。
3.算是备份了一下资源库。
4.给出了同步资源库的方案,其实这里的文件资源库与上面的缓存很相似,只是缓存得更彻底。