题目:
现有如此三份数据: 1、users.dat 数据格式为: 2::M::56::16::70072 对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String 对应字段中文解释:用户id,性别,年龄,职业,邮政编码 2、movies.dat 数据格式为: 2::Jumanji (1995)::Adventure|Children's|Fantasy 对应字段为:MovieID BigInt, Title String, Genres String 对应字段中文解释:电影ID,电影名字,电影类型 3、ratings.dat 数据格式为: 1::1193::5::978300760 对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String 对应字段中文解释:用户ID,电影ID,评分,评分时间戳 用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型 (4)求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评)
题目分析:本案例可以分解为4步进行,
第一步:求出喜欢看电影的女性(uid为key值,影评次数为value值)
第二步:选出最喜欢看电影的人,即对第一步的结果进行降序排列,取第一个值
第三步:选出第二步取出人的最高评分的10部电影(movieid,评分为key值,通过降序排列,取出前10)
第四步:求出选出10部电影的平均评分,然后进行组合输出,输出结果为电影id,电影名称,平均评分,用户id
通过4个MapReduce进行任务完成,通过jobcontrol串行进行,由于需要依赖上步结果,需要使用setup加载文件到内存中去,所以需要在集群完成调试
第二步需要借助实现类和分组类进行(影评次数降序排序)排序和(男女分组)分组
排序代码:
/** * @author: lpj * @date: 2018年3月18日 下午1:13:02 * @Description: */ package lpj.filmBean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * */ public class FMaxRateBean implements WritableComparable<FMaxRateBean>{ private String sex; public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } private String uid; private int num; public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } /** * @param sex * @param uid * @param num */ public FMaxRateBean(String sex, String uid, int num) { super(); this.sex = sex; this.uid = uid; this.num = num; } /** * */ public FMaxRateBean() { super(); // TODO Auto-generated constructor stub } @Override public String toString() { return sex + "\t" + uid + "\t" + num; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(sex); out.writeUTF(uid); out.writeInt(num); } @Override public void readFields(DataInput in) throws IOException { sex = in.readUTF(); uid = in.readUTF(); num = in.readInt(); } @Override public int compareTo(FMaxRateBean o) { int diff = this.sex.compareTo(o.sex); if (diff == 0) { return o.num - this.num; }else { return diff; } } }
分组代码:
/** * @author: lpj * @date: 2018年3月18日 下午1:59:06 * @Description: */ package lpj.filmBean; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * */ public class FMaxgroup extends WritableComparator{ /** * */ public FMaxgroup() { super(FMaxRateBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { FMaxRateBean f1 = (FMaxRateBean)a; FMaxRateBean f2 = (FMaxRateBean)b; return f1.getSex().compareTo(f2.getSex()); } }
第三步需要实现用户影评分数降序排序和用户分组
排序代码:
/** * @author: lpj * @date: 2018年3月18日 下午2:16:58 * @Description: */ package lpj.filmBean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * */ public class FUserMoviebean implements WritableComparable<FUserMoviebean>{ private String uid; private String moiveid; private int rate; public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } public String getMoiveid() { return moiveid; } public void setMoiveid(String moiveid) { this.moiveid = moiveid; } public int getRate() { return rate; } public void setRate(int rate) { this.rate = rate; } /** * @param uid * @param moiveid * @param rate */ public FUserMoviebean(String uid, String moiveid, int rate) { super(); this.uid = uid; this.moiveid = moiveid; this.rate = rate; } /** * */ public FUserMoviebean() { super(); // TODO Auto-generated constructor stub } @Override public String toString() { return uid + "\t" + moiveid + "\t" + rate; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(uid); out.writeUTF(moiveid); out.writeInt(rate); } @Override public void readFields(DataInput in) throws IOException { uid = in.readUTF(); moiveid = in.readUTF(); rate = in.readInt(); } @Override public int compareTo(FUserMoviebean o) { int diff = this.uid.compareTo(o.uid); if (diff == 0) { return o.rate - this.rate; }else { return diff; } } }
分组代码:
/** * @author: lpj * @date: 2018年3月18日 下午2:21:30 * @Description: */ package lpj.filmBean; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * */ public class FUserMoiveGroup extends WritableComparator{ /** * */ public FUserMoiveGroup() { super(FUserMoviebean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { FUserMoviebean f1 = (FUserMoviebean)a; FUserMoviebean f2 = (FUserMoviebean)b; return f1.getUid().compareTo(f2.getUid()); } }
主体代码:
/** * @author: lpj * @date: 2018年3月16日 下午7:16:47 * @Description: */ package lpj.filmCritic; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import lpj.filmBean.FMaxRateBean; import lpj.filmBean.FMaxgroup; import lpj.filmBean.FUserMoiveGroup; import lpj.filmBean.FUserMoviebean; /** * */ public class UserMoiveAvgrateMR { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop02:9000"); System.setProperty("HADOOP_USER_NAME", "hadoop"); //------------------------------------------------ FileSystem fs = FileSystem.get(conf);//默认使用本地 Job job = Job.getInstance(conf); job.setJarByClass(UserMoiveAvgrateMR.class); job.setMapperClass(UserMoiveAvgrateMR_Mapper.class); job.setReducerClass(UserMoiveAvgrateMR_Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inputPath = new Path("/a/totalFilmInfos.txt"); Path outputPath = new Path("/a/homework11_4_1"); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); //------------------------------------------------ FileSystem fs2 = FileSystem.get(conf);//默认使用本地 Job job2 = Job.getInstance(conf); job2.setJarByClass(UserMoiveAvgrateMR.class); job2.setMapperClass(UserMoiveAvgrateMR2_Mapper.class); job2.setReducerClass(UserMoiveAvgrateMR2_Reducer.class); job2.setGroupingComparatorClass(FMaxgroup.class); job2.setOutputKeyClass(FMaxRateBean.class); job2.setOutputValueClass(NullWritable.class); Path inputPath2 = new Path("/a/homework11_4_1"); Path outputPath2 = new Path("/a/homework11_4_2"); if (fs2.exists(outputPath2)) { fs2.delete(outputPath2, true); } FileInputFormat.setInputPaths(job2, inputPath2); FileOutputFormat.setOutputPath(job2, outputPath2); //------------------------------------------------ FileSystem fs3 = FileSystem.get(conf);//默认使用本地 Job job3 = Job.getInstance(conf); job3.setJarByClass(UserMoiveAvgrateMR.class); job3.setMapperClass(UserMoiveAvgrateMR3_Mapper.class); job3.setReducerClass(UserMoiveAvgrateMR3_Reducer.class); job3.setGroupingComparatorClass(FUserMoiveGroup.class); job3.setOutputKeyClass(FUserMoviebean.class); job3.setOutputValueClass(NullWritable.class); URI uri = new URI("/a/homework11_4_2/part-r-00000"); job3.addCacheFile(uri); Path inputPath3 = new Path("/a/totalFilmInfos.txt"); Path outputPath3 = new Path("/a/homework11_4_3"); if (fs3.exists(outputPath3)) { fs3.delete(outputPath3, true); } FileInputFormat.setInputPaths(job3, inputPath3); FileOutputFormat.setOutputPath(job3, outputPath3); //------------------------------------------------ FileSystem fs4 = FileSystem.get(conf);//默认使用本地 Job job4 = Job.getInstance(conf); job4.setJarByClass(UserMoiveAvgrateMR.class); job4.setMapperClass(UserMoiveAvgrateMR4_Mapper.class); job4.setReducerClass(UserMoiveAvgrateMR4_Reducer.class); job4.setOutputKeyClass(Text.class); job4.setOutputValueClass(Text.class); URI uri4 = new URI("/a/homework11_4_3/part-r-00000"); job4.addCacheFile(uri4); Path inputPath4 = new Path("/a/totalFilmInfos.txt"); Path outputPath4 = new Path("/a/homework11_4_4"); if (fs4.exists(outputPath4)) { fs4.delete(outputPath4, true); } FileInputFormat.setInputPaths(job4, inputPath4); FileOutputFormat.setOutputPath(job4, outputPath4); //------------------------------------------------ ControlledJob aJob = new ControlledJob(job.getConfiguration()); ControlledJob bJob = new ControlledJob(job2.getConfiguration()); ControlledJob cJob = new ControlledJob(job3.getConfiguration()); ControlledJob dJob = new ControlledJob(job4.getConfiguration()); aJob.setJob(job); bJob.setJob(job2); cJob.setJob(job3); dJob.setJob(job4); JobControl jc = new JobControl("jc"); jc.addJob(aJob); jc.addJob(bJob); jc.addJob(cJob); jc.addJob(dJob); bJob.addDependingJob(aJob); cJob.addDependingJob(bJob); dJob.addDependingJob(cJob); Thread thread = new Thread(jc); thread.start(); while(!jc.allFinished()){ thread.sleep(1000); } jc.stop(); } //(4)求最喜欢看电影(影评次数最多)的那位女性uid 评最高分的10部电影 movieid 的平均影评分 avgrate(人,电影名,影评) //-------------求最喜欢看电影的那位女性uid---------------- //userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType //用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型 public static class UserMoiveAvgrateMR_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{ Text kout = new Text(); Text valueout = new Text(); @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String [] reads = value.toString().trim().split("::"); String sex = reads[4]; String userid = reads[0];//key if (sex.endsWith("F")) { String kk = userid; kout.set("F" + "\t" + kk); context.write(kout, NullWritable.get()); } } } public static class UserMoiveAvgrateMR_Reducer extends Reducer<Text, NullWritable, Text, Text>{ Text kout = new Text(); Text valueout = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException { int rateNum = 0; for(NullWritable inv : values){ rateNum++; } valueout.set(rateNum+""); context.write(key, valueout); } } //----------(影评次数最多)------------ public static class UserMoiveAvgrateMR2_Mapper extends Mapper<LongWritable, Text, FMaxRateBean, NullWritable>{ Text kout = new Text(); Text valueout = new Text(); FMaxRateBean fm = new FMaxRateBean(); @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String [] reads = value.toString().trim().split("\t"); fm.setSex(reads[0]); fm.setUid(reads[1]); fm.setNum(Integer.parseInt(reads[2])); context.write(fm, NullWritable.get()); } } public static class UserMoiveAvgrateMR2_Reducer extends Reducer<FMaxRateBean, NullWritable, FMaxRateBean, NullWritable>{ Text kout = new Text(); Text valueout = new Text(); @Override protected void reduce(FMaxRateBean key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException { int count = 0; for(NullWritable inv : values){ count++; if (count <= 1) { context.write(key, NullWritable.get()); }else { return; } } } } //----------------评最高分的10部电影 movieid //uid moiveid rate public static class UserMoiveAvgrateMR3_Mapper extends Mapper<LongWritable, Text, FUserMoviebean, NullWritable>{ private static String userInfo = ""; @SuppressWarnings("deprecation") @Override protected void setup(Context context)throws IOException, InterruptedException { Path[] paths = context.getLocalCacheFiles(); String str = paths[0].toUri().toString(); String readline = null; BufferedReader bf = new BufferedReader(new FileReader(new File(str))); while((readline = bf.readLine()) != null){ userInfo = readline;//F 1150 1302 } System.out.println(userInfo); IOUtils.closeStream(bf); } Text kout = new Text(); Text valueout = new Text(); FUserMoviebean fu = new FUserMoviebean(); @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { System.out.println(userInfo); String [] userinfos = userInfo.split("\t"); String userid = userinfos[1]; //userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType //用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型 String [] reads = value.toString().trim().split("::");//注意分隔符!!!!! if (reads[0].equals(userid)) { fu.setUid(reads[0]); fu.setMoiveid(reads[1]); fu.setRate(Integer.parseInt(reads[2])); context.write(fu, NullWritable.get()); } } } public static class UserMoiveAvgrateMR3_Reducer extends Reducer<FUserMoviebean, NullWritable, FUserMoviebean, NullWritable>{ Text kout = new Text(); Text valueout = new Text(); @Override protected void reduce(FUserMoviebean key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException { int count = 0; for(NullWritable inv : values){ count++; if (count <= 10) {//取评分最高的10个 context.write(key, NullWritable.get()); }else { return; } } } } //--------------------的平均影评分 avgrate(人,电影名,影评) //1150 1230 5 //userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType //用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型 public static class UserMoiveAvgrateMR4_Mapper extends Mapper<LongWritable, Text, Text, Text>{ private static List<String> movidmap = new ArrayList<>(); @SuppressWarnings("deprecation") @Override protected void setup(Context context)throws IOException, InterruptedException { Path[] paths = context.getLocalCacheFiles(); String str = paths[0].toUri().toString(); String readline = null; BufferedReader bf = new BufferedReader(new FileReader(new File(str))); while((readline = bf.readLine()) != null){ movidmap.add(readline);//1150 1230 5 } IOUtils.closeStream(bf); } Text kout = new Text(); Text valueout = new Text(); FUserMoviebean fu = new FUserMoviebean(); //用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型 @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String [] reads = value.toString().split("::"); String movieid = reads[1]; for(int i = 0; i < movidmap.size(); i++){ String movieidKnow = movidmap.get(i).split("\t")[1]; String uid = movidmap.get(i).split("\t")[0]; if (movieid.equals(movieidKnow)) { String kk = movieid; String vv = reads[8] + "\t" + reads[2] + "\t" + uid; kout.set(kk); valueout.set(vv); context.write(kout, valueout); } } } } public static class UserMoiveAvgrateMR4_Reducer extends Reducer<Text, Text, Text, Text>{ Text kout = new Text(); Text valueout = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { int count = 0; int totalRate = 0; double avgRate = 0; String moivname = ""; String uid = ""; for(Text inv : values){ count++; String[] split = inv.toString().split("\t"); totalRate += Integer.parseInt(split[1]); moivname = split[0]; uid = split[2]; } avgRate = 1.0 * totalRate / count; DecimalFormat df = new DecimalFormat("#.#"); String avg = df.format(avgRate); String vv = moivname + "\t" + avg + "\t" + uid; valueout.set(vv); context.write(key, valueout); } } }
中间第一步结果:截取部分
F 1 53 F 10 401 F 1000 84 F 101 106 F 1012 55 F 1014 39 F 1024 43 F 1026 44 F 1028 61 F 1034 337 F 1038 148 F 1039 28 F 1043 36 F 1045 37
中间第二步结果:
F 1150 1302
中间第三步结果:
1150 951 5 1150 3671 5 1150 3307 5 1150 1230 5 1150 904 5 1150 162 5 1150 3675 5 1150 1966 5 1150 3163 5 1150 2330 5
最终结果
1230 Annie Hall (1977) 4.1 1150 162 Crumb (1994) 4.1 1150 1966 Metropolitan (1990) 3.6 1150 2330 Hands on a Hard Body (1996) 4.2 1150 3163 Topsy-Turvy (1999) 3.7 1150 3307 City Lights (1931) 4.4 1150 3671 Blazing Saddles (1974) 4 1150 3675 White Christmas (1954) 3.8 1150 904 Rear Window (1954) 4.5 1150 951 His Girl Friday (1940) 4.2 1150
总结:由于任务需要分成多步进行,中间相互关联较多,编写过程可以进行拆分调试。最终统一执行
过程出现问题:不同文件的拆分符号不同,加载文件需要在任务中添加