题目:
20170308,黄渤,光环斗地主,8,360手机助手,0.1版本,北京
20170308,黄渤,光环斗地主,5,360手机助手,0.1版本,北京
20170308,黄渤,光环斗地主,7,360手机助手,0.1版本,北京
20170308,黄渤,光环斗地主,10,360手机助手,0.2版本,北京
20170308,黄渤,光环斗地主,9,360手机助手,0.2版本,北京
20170308,黄渤,光环斗地主,23,360手机助手,0.2版本,北京
20170308,黄渤,光环斗地主,22,360手机助手,0.2版本,北京
20170308,黄渤,光环斗地主,14,360手机助手,0.3版本,北京
20170308,黄渤,光环斗地主,13,360手机助手,0.3版本,北京
20170308,黄渤,光环斗地主,16,360手机助手,0.4版本,北京
20170308,黄渤,光环斗地主,18,360手机助手,0.4版本,北京
20170308,黄渤,光环斗地主,19,360手机助手,0.4版本,北京
20170308,黄渤,光环斗地主,15,360手机助手,0.4版本,北京
20170309,徐峥,光环斗地主,8,360手机助手,0.1版本,北京
20170309,徐峥,光环斗地主,5,360手机助手,0.1版本,北京
20170309,徐峥,光环斗地主,6,360手机助手,0.1版本,北京
20170309,徐峥,光环斗地主,10,360手机助手,0.2版本,北京
20170309,徐峥,光环斗地主,12,360手机助手,0.2版本,北京
20170309,徐峥,光环斗地主,11,360手机助手,0.3版本,北京
20170309,徐峥,光环斗地主,9,360手机助手,0.2版本,北京
20170309,徐峥,光环斗地主,23,360手机助手,0.2版本,北京
20170309,徐峥,光环斗地主,22,360手机助手,0.2版本,北京
20170309,徐峥,光环斗地主,14,360手机助手,0.3版本,北京
20170309,徐峥,光环斗地主,13,360手机助手,0.3版本,北京
20170309,徐峥,光环斗地主,16,360手机助手,0.4版本,北京
20170309,徐峥,光环斗地主,18,360手机助手,0.4版本,北京
20170309,徐峥,光环斗地主,19,360手机助手,0.5版本,北京
20170309,徐峥,光环斗地主,15,360手机助手,0.4版本,北京
字段信息:
时间,用户名,游戏名,小时,数据来源,游戏版本,用户所在地
date, name, game, hour, source, version, city
题目要求:
在所有有版本变动的记录后面追加一条字段信息:该信息就是上一个版本的版本号,只限同用户
例如:
20170308,黄渤,光环斗地主,10,360手机助手,0.2版本,北京
20170308,黄渤,光环斗地主,13,360手机助手,0.3版本,北京,0.2版本
20170308,徐峥,光环斗地主,14,360手机助手,0.3版本,北京
20170308,徐峥,光环斗地主,15,360手机助手,0.4版本,北京,0.3版本
用户“黄渤”在10点钟是0.2版本,但是到了13点变成了0.3版本,那么就在13点钟这条记录的后面追加一个字段值0.2版本,也就是上个版本的版本号
当然,为什么从10点直接到了13点,因为11点和12点的数据没有收集到。
解题思路:使用类对数据进行封装,排序规则:人名 > 日期 >时间
然后在reduce方法,先读取第一个对象(第一个没法参与比较)的版本,然后获取下一个进行比较,如果版本不一致,在后面追加版本信息改变。
封装类代码:
/** * @author: lpj * @date: 2018年3月15日 上午10:46:05 * @Description: */package lpj.reduceWorkbean;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/** * */public class Game implements WritableComparable<Game>{ private String date; private String name; private String gameName; private int hours; private String whereFrom; private String version; private String location; public String getDate() { return date; } public void setDate(String date) { this.date = date; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getGameName() { return gameName; } public void setGameName(String gameName) { this.gameName = gameName; } public int getHours() { return hours; } public void setHours(int hours) { this.hours = hours; } public String getWhereFrom() { return whereFrom; } public void setWhereFrom(String whereFrom) { this.whereFrom = whereFrom; } public String getVersion() { return version; } public void setVersion(String version) { this.version = version; } public String getLocation() { return location; } public void setLocation(String location) { this.location = location; } /** * @param date * @param name * @param gameName * @param hours * @param whereFrom * @param version * @param location */ public Game(String date, String name, String gameName, int hours, String whereFrom, String version, String location) { super(); this.date = date; this.name = name; this.gameName = gameName; this.hours = hours; this.whereFrom = whereFrom; this.version = version; this.location = location; } /** * */ public Game() { // TODO Auto-generated constructor stub } @Override public String toString() { return date + "\t" + name + "\t" + gameName + "\t" + hours + "\t" + whereFrom + "\t" + version + "\t" + location ; } /* (non-Javadoc) * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) */ @Override public void readFields(DataInput in) throws IOException { date = in.readUTF(); name = in.readUTF(); gameName = in.readUTF(); hours = in.readInt(); whereFrom = in.readUTF(); version = in.readUTF(); location = in.readUTF(); } /* (non-Javadoc) * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(date); out.writeUTF(name); out.writeUTF(gameName); out.writeInt(hours); out.writeUTF(whereFrom); out.writeUTF(version); out.writeUTF(location); } /* (non-Javadoc) * @see java.lang.Comparable#compareTo(java.lang.Object) */ @Override public int compareTo(Game o) { //最先按照人名排序 if (this.name.equals(o.name)) { //先按照时间排序 if (this.date.equals(o.date)) { //再次按照时间排序 if (this.hours == o.hours) { return 0; }else { return this.hours - (o.hours) > 0 ? 1 : -1; } }else { return this.date.compareTo(o.date) > 0 ? 1 : -1; } }else { return this.name.compareTo(o.name) > 0 ? 1 : -1; } } }
分组代码:
/** * @author: lpj * @date: 2018年3月16日 下午10:36:55 * @Description: */package lpj.reduceWorkbean;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * */public class MyGroup2 extends WritableComparator{ public MyGroup2() { super(Game.class,true);//创建对象 } @Override public int compare(WritableComparable a, WritableComparable b) { Game s1 = (Game)a; Game s2 = (Game)b; return s1.getName().compareTo(s2.getName());//设置人名分组器 } }
主体逻辑代码:
/** * @author: lpj * @date: 2018年3月16日 下午7:16:47 * @Description: */package lpj.reduceWork;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.shell.Count;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import lpj.reduceWorkbean.Game;import lpj.reduceWorkbean.MyGroup2;/** * */public class VersionChangeMR2 { public static void main(String[] args) throws Exception { Configuration conf = new Configuration();// conf.addResource("hdfs-site.xml");//使用配置文件// System.setProperty("HADOOP_USER_NAME", "hadoop");//使用集群 FileSystem fs = FileSystem.get(conf);//默认使用本地 Job job = Job.getInstance(conf); job.setJarByClass(VersionChangeMR2.class); job.setMapperClass(VersionChangeMR_Mapper.class); job.setReducerClass(VersionChangeMR_Reducer.class); job.setMapOutputKeyClass(Game.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setGroupingComparatorClass(MyGroup2.class); Path inputPath = new Path("d:/a/homework7.txt"); Path outputPath = new Path("d:/a/homework7"); if (fs.exists(inputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); boolean isdone = job.waitForCompletion(true); System.exit(isdone ? 0 : 1); } public static class VersionChangeMR_Mapper extends Mapper<LongWritable, Text, Game, NullWritable>{ Text kout = new Text(); Text valueout = new Text();// Game ga =new Game(); @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String [] reads = value.toString().trim().split(","); //20170309,徐峥,光环斗地主,15,360手机助手,0.4版本,北京 Game ga1 = new Game(reads[0],reads[1],reads[2],Integer.parseInt(reads[3]) ,reads[4],reads[5],reads[6]); context.write(ga1, NullWritable.get()); } } public static class VersionChangeMR_Reducer extends Reducer<Game, NullWritable, Text, NullWritable>{ Text kout = new Text(); Text valueout = new Text(); Game ga = new Game(); @Override protected void reduce(Game key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException { int count = 0; String firstVersion = "";//记录初始版本号 for(NullWritable niv : values){ count++; if (count == 1) { firstVersion = key.getVersion();//第一次版本不需比较 String kk = key.toString(); kout.set(kk); context.write(kout, NullWritable.get()); }else { if (!(firstVersion.equals(key.getVersion()))) { String kk = key.toString() + "\t" + firstVersion; kout.set(kk); context.write(kout, NullWritable.get()); }else { String kk = key.toString(); kout.set(kk); context.write(kout, NullWritable.get()); } firstVersion = key.getVersion();//将比较完后的版本作为初始版本 } } } }}
运行结果:
20170309 徐峥 光环斗地主 5 360手机助手 0.1版本 北京20170309 徐峥 光环斗地主 6 360手机助手 0.1版本 北京20170309 徐峥 光环斗地主 8 360手机助手 0.1版本 北京20170309 徐峥 光环斗地主 9 360手机助手 0.2版本 北京 0.1版本20170309 徐峥 光环斗地主 10 360手机助手 0.2版本 北京20170309 徐峥 光环斗地主 11 360手机助手 0.3版本 北京 0.2版本20170309 徐峥 光环斗地主 12 360手机助手 0.2版本 北京 0.3版本20170309 徐峥 光环斗地主 13 360手机助手 0.3版本 北京 0.2版本20170309 徐峥 光环斗地主 14 360手机助手 0.3版本 北京20170309 徐峥 光环斗地主 15 360手机助手 0.4版本 北京 0.3版本20170309 徐峥 光环斗地主 16 360手机助手 0.4版本 北京20170309 徐峥 光环斗地主 18 360手机助手 0.4版本 北京20170309 徐峥 光环斗地主 19 360手机助手 0.5版本 北京 0.4版本20170309 徐峥 光环斗地主 22 360手机助手 0.2版本 北京 0.5版本20170309 徐峥 光环斗地主 23 360手机助手 0.2版本 北京20170308 黄渤 光环斗地主 5 360手机助手 0.1版本 北京20170308 黄渤 光环斗地主 7 360手机助手 0.1版本 北京20170308 黄渤 光环斗地主 8 360手机助手 0.1版本 北京20170308 黄渤 光环斗地主 9 360手机助手 0.2版本 北京 0.1版本20170308 黄渤 光环斗地主 10 360手机助手 0.2版本 北京20170308 黄渤 光环斗地主 13 360手机助手 0.3版本 北京 0.2版本20170308 黄渤 光环斗地主 14 360手机助手 0.3版本 北京20170308 黄渤 光环斗地主 15 360手机助手 0.4版本 北京 0.3版本20170308 黄渤 光环斗地主 16 360手机助手 0.4版本 北京20170308 黄渤 光环斗地主 18 360手机助手 0.4版本 北京20170308 黄渤 光环斗地主 19 360手机助手 0.4版本 北京20170308 黄渤 光环斗地主 22 360手机助手 0.2版本 北京 0.4版本20170308 黄渤 光环斗地主 23 360手机助手 0.2版本 北京