电信数据挖掘中ID过少问题解决总结

时间:2022-09-05 12:02:48

症状:项目更新以后,各省的id数总是出校异常波动,特别是山东的

解决:

首先考虑是不是数据源的问题,如果数据清洗不规范,某个字段缺失,可能造成此问题,分析各个接口一段时间数据量的波动,如果波动曲线差不多,就可能是数据源的问题。

经过分析一段时候回来的数据以后,发现各个接口数据的波动并不是相似的,说明不是数据源的问题。


接着再考虑是不是kv接口网络的问题,即数据挖出来了,但是没有发送回来,分析一段时间id接口回来的数据量和senduserinfotask接口read的数据量比较,如果波动曲线差不多,就可以说明kv接口每问题。

曲线波动差不多,说明kv接口没有问题

特别说明:senduserinfotask接口是每24小时发送一次:sendUserInfoTask类中代码如下:

		if (!"03".equals(FileSuffixTime.getHour())) {
			return RepeatStatus.FINISHED;
		}

每次跑完三点的数据以后,就将一天的数据发送回来。

再看FileSuffixTime类可知,线上项目是跑四个小时之前的数据,FileSuffixTime类中,代码如下:

	private static String date = DateUtils.getFormat(DateUtils.addHours(new Date(), -4), "yyyyMMdd");
	private static String hour = DateUtils.getFormat(DateUtils.addHours(new Date(), -4), "HH");


电信运维ftp的日志格式是pxene-start-yh052507.log,后面一串代表跑数据的实际时间,时延是4个小时,所以应该在7点的日志去找userinfotask有发送操作的日志。


接下来分析代码的逻辑,看看是不是中间的getuserinfotask的过滤逻辑有问题,滤掉了有用的id:

通过分析getuserinfotask写出的数据和senduserinfotask读入的数据的曲线分析,这个问题也可以排除。


按照程序累加的逻辑,0---3,数据应该应该是升序,4-7,数据应该开始新的升序,可是并没有,由此可见,问题出在了数据的累加getuserinforeducer中的累加逻辑上。

问题定位了后面的事情就好办了。

bug修改一:getuserinforedcer的cleanup方法中,分隔符确实

bug修改一:getuserinforedcer的cleanup方法中,context.write忘写

bug修改三:getuserinforeducer的setup方法中用普通的io方法读取sequencefile格式的文件

getuserinfotask中上个小时数据回朔:

		// 上次结果的路径
		String lastPath = null;
		// 4点统计新的一天的数据
		if (!"04".equals(FileSuffixTime.getHour())) {
			// 回溯8小时的数据,如果没有就重新跑数据,防止数据间隔的情况
			for (int i=1; i<=8; i++) {
				String lastDatehour = DateUtils.getFormat(DateUtils.addHours(DateUtils.getDate(datehour, "yyyyMMddHH"), -i), "yyyyMMddHH");
				// 03点的数据已经发送过,如果回溯到03点就跳出循环
				if (lastDatehour.endsWith("03")) {
					log.info(BaseConstant.LOG_PREFIX + "last path is not existed");
					break;
				}
				lastPath = lastPathTmpl.replace("${datehour}", lastDatehour);
				if (hadoopFs.exists(new Path(lastPath))) {
					log.info(BaseConstant.LOG_PREFIX + "last path is " + lastPath);
					break;
				} else {
					lastPath = null;
				}
			}
		}

getuserinfotask中回朔数据分发

		// 将上次结果分发
		if (lastPath != null) {
			FileStatus[] statuses = hadoopFs.globStatus(new Path(lastPath+"*"));
			for (FileStatus status : statuses) {
				String path = status.getPath().toString();
				if (!path.endsWith("_SUCCESS") && !path.endsWith("_temporary")) {
					job.addCacheFile(new URI(status.getPath().toString()));
				}
			}
		}

getuserinforeducer中上个小时cache的读入:

URI[] uris = context.getCacheFiles();
		if (uris != null) {
			Reader reader = new SequenceFile.Reader(conf, Reader.file(new Path(uri)), Reader.bufferSize(1024 * 20));
				LongWritable key = new LongWritable();
				Text value = new Text();
				while (reader.next(key, value)) {
					//do something
				}
		}

最后再次测试:连续跑几个小时的数据(可以写一个shell脚本,命令之间用&&连接)统计getuserinfotask写出数据的变化,满足逻辑,即4点以前的数据持续增多,4点作为新增的起点。自此,bug解决

java -jar /home/vendorwins/chenjinghui/pxene-telecom-3.0.8-SNAPSHOT.jar -testDataOn 2016051501 > /home/vendorwins/chenjinghui/log/sd01.out.log &&
java -jar /home/vendorwins/chenjinghui/pxene-telecom-3.0.8-SNAPSHOT.jar -testDataOn 2016051502 > /home/vendorwins/chenjinghui/log/sd02.out.log &&
java -jar /home/vendorwins/chenjinghui/pxene-telecom-3.0.8-SNAPSHOT.jar -testDataOn 2016051503 > /home/vendorwins/chenjinghui/log/sd03.out.log &&
java -jar /home/vendorwins/chenjinghui/pxene-telecom-3.0.8-SNAPSHOT.jar -testDataOn 2016051504 > /home/vendorwins/chenjinghui/log/sd04.out.log &&
java -jar /home/vendorwins/chenjinghui/pxene-telecom-3.0.8-SNAPSHOT.jar -testDataOn 2016051505 > /home/vendorwins/chenjinghui/log/sd05.out.log