症状:项目更新以后,各省的id数总是出校异常波动,特别是山东的
解决:
首先考虑是不是数据源的问题,如果数据清洗不规范,某个字段缺失,可能造成此问题,分析各个接口一段时间数据量的波动,如果波动曲线差不多,就可能是数据源的问题。
经过分析一段时候回来的数据以后,发现各个接口数据的波动并不是相似的,说明不是数据源的问题。
接着再考虑是不是kv接口网络的问题,即数据挖出来了,但是没有发送回来,分析一段时间id接口回来的数据量和senduserinfotask接口read的数据量比较,如果波动曲线差不多,就可以说明kv接口每问题。
曲线波动差不多,说明kv接口没有问题
特别说明:senduserinfotask接口是每24小时发送一次:sendUserInfoTask类中代码如下:
if (!"03".equals(FileSuffixTime.getHour())) { return RepeatStatus.FINISHED; }
每次跑完三点的数据以后,就将一天的数据发送回来。
再看FileSuffixTime类可知,线上项目是跑四个小时之前的数据,FileSuffixTime类中,代码如下:
private static String date = DateUtils.getFormat(DateUtils.addHours(new Date(), -4), "yyyyMMdd"); private static String hour = DateUtils.getFormat(DateUtils.addHours(new Date(), -4), "HH");
电信运维ftp的日志格式是pxene-start-yh052507.log,后面一串代表跑数据的实际时间,时延是4个小时,所以应该在7点的日志去找userinfotask有发送操作的日志。
接下来分析代码的逻辑,看看是不是中间的getuserinfotask的过滤逻辑有问题,滤掉了有用的id:
通过分析getuserinfotask写出的数据和senduserinfotask读入的数据的曲线分析,这个问题也可以排除。
按照程序累加的逻辑,0---3,数据应该应该是升序,4-7,数据应该开始新的升序,可是并没有,由此可见,问题出在了数据的累加getuserinforeducer中的累加逻辑上。
问题定位了后面的事情就好办了。
bug修改一:getuserinforedcer的cleanup方法中,分隔符确实
bug修改一:getuserinforedcer的cleanup方法中,context.write忘写
bug修改三:getuserinforeducer的setup方法中用普通的io方法读取sequencefile格式的文件
getuserinfotask中上个小时数据回朔:
// 上次结果的路径 String lastPath = null; // 4点统计新的一天的数据 if (!"04".equals(FileSuffixTime.getHour())) { // 回溯8小时的数据,如果没有就重新跑数据,防止数据间隔的情况 for (int i=1; i<=8; i++) { String lastDatehour = DateUtils.getFormat(DateUtils.addHours(DateUtils.getDate(datehour, "yyyyMMddHH"), -i), "yyyyMMddHH"); // 03点的数据已经发送过,如果回溯到03点就跳出循环 if (lastDatehour.endsWith("03")) { log.info(BaseConstant.LOG_PREFIX + "last path is not existed"); break; } lastPath = lastPathTmpl.replace("${datehour}", lastDatehour); if (hadoopFs.exists(new Path(lastPath))) { log.info(BaseConstant.LOG_PREFIX + "last path is " + lastPath); break; } else { lastPath = null; } } }
getuserinfotask中回朔数据分发
// 将上次结果分发 if (lastPath != null) { FileStatus[] statuses = hadoopFs.globStatus(new Path(lastPath+"*")); for (FileStatus status : statuses) { String path = status.getPath().toString(); if (!path.endsWith("_SUCCESS") && !path.endsWith("_temporary")) { job.addCacheFile(new URI(status.getPath().toString())); } } }
getuserinforeducer中上个小时cache的读入:
URI[] uris = context.getCacheFiles(); if (uris != null) { Reader reader = new SequenceFile.Reader(conf, Reader.file(new Path(uri)), Reader.bufferSize(1024 * 20)); LongWritable key = new LongWritable(); Text value = new Text(); while (reader.next(key, value)) { //do something } }
最后再次测试:连续跑几个小时的数据(可以写一个shell脚本,命令之间用&&连接)统计getuserinfotask写出数据的变化,满足逻辑,即4点以前的数据持续增多,4点作为新增的起点。自此,bug解决
java -jar /home/vendorwins/chenjinghui/pxene-telecom-3.0.8-SNAPSHOT.jar -testDataOn 2016051501 > /home/vendorwins/chenjinghui/log/sd01.out.log && java -jar /home/vendorwins/chenjinghui/pxene-telecom-3.0.8-SNAPSHOT.jar -testDataOn 2016051502 > /home/vendorwins/chenjinghui/log/sd02.out.log && java -jar /home/vendorwins/chenjinghui/pxene-telecom-3.0.8-SNAPSHOT.jar -testDataOn 2016051503 > /home/vendorwins/chenjinghui/log/sd03.out.log && java -jar /home/vendorwins/chenjinghui/pxene-telecom-3.0.8-SNAPSHOT.jar -testDataOn 2016051504 > /home/vendorwins/chenjinghui/log/sd04.out.log && java -jar /home/vendorwins/chenjinghui/pxene-telecom-3.0.8-SNAPSHOT.jar -testDataOn 2016051505 > /home/vendorwins/chenjinghui/log/sd05.out.log