题目:
computer,huangxiaoming,85,86,41,75,93,42,85 computer,xuzheng,54,52,86,91,42 computer,huangbo,85,42,96,38 english,zhaobenshan,54,52,86,91,42,85,75 english,liuyifei,85,41,75,21,85,96,14 algorithm,liuyifei,75,85,62,48,54,96,15 computer,huangjiaju,85,75,86,85,85 english,liuyifei,76,95,86,74,68,74,48 english,huangdatou,48,58,67,86,15,33,85 algorithm,huanglei,76,95,86,74,68,74,48 algorithm,huangjiaju,85,75,86,85,85,74,86 computer,huangdatou,48,58,67,86,15,33,85 english,zhouqi,85,86,41,75,93,42,85,75,55,47,22 english,huangbo,85,42,96,38,55,47,22 algorithm,liutao,85,75,85,99,66 computer,huangzitao,85,86,41,75,93,42,85 math,wangbaoqiang,85,86,41,75,93,42,85 computer,liujialing,85,41,75,21,85,96,14,74,86 computer,liuyifei,75,85,62,48,54,96,15 computer,liutao,85,75,85,99,66,88,75,91 computer,huanglei,76,95,86,74,68,74,48 english,liujialing,75,85,62,48,54,96,15 math,huanglei,76,95,86,74,68,74,48 math,huangjiaju,85,75,86,85,85,74,86 math,liutao,48,58,67,86,15,33,85 english,huanglei,85,75,85,99,66,88,75,91 math,xuzheng,54,52,86,91,42,85,75 math,huangxiaoming,85,75,85,99,66,88,75,91 math,liujialing,85,86,41,75,93,42,85,75 english,huangxiaoming,85,86,41,75,93,42,85 algorithm,huangdatou,48,58,67,86,15,33,85 algorithm,huangzitao,85,86,41,75,93,42,85,75 一、数据解释 数据字段个数不固定: 第一个是课程名称,总共四个课程,computer,math,english,algorithm, 第二个是学生姓名,后面是每次考试的分数 二、统计需求: 1、统计每门课程的参考人数和课程平均分 2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数 3、求出每门课程参考学生成绩最高的学生的信息:课程,和平均分 要求:把课程,姓名和平均分这三个字段封装成一个自定义对象,当做key, 求出结果 4、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分
题目一:
1、统计每门课程的参考人数和课程平均分
思路:
以课程为key,在map中求出每个学生的平均成绩,然后在reduce中求出整体的课程平均成绩。
代码:
package practice6; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 需求:统计每门课程的参考人数和课程平均分 * @author potter * */ public class Practice6 { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // conf.set("fs.defaultFS", "hdfs://potter2:9000"); // System.setProperty("HADOOP_USER_NAME", "potter"); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(); job.setJarByClass(Practice6.class); job.setMapperClass(Practice6Mapper.class); job.setReducerClass(Practice6Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path input = new Path("D:\\practice\\input6\\work6.txt"); Path output = new Path("D:\\practice\\input6\\output1"); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); if (fs.exists(output)) { fs.delete(output,true); } boolean isdone = job.waitForCompletion(true); System.exit(isdone ? 0 : 1); } public static class Practice6Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{ /** * computer,huangxiaoming,85,86,41,75,93,42,85 * computer,xuzheng,54,52,86,91,42 * 统计每门课程的参考人数和课程平均分 */ Text text1 = new Text(); DoubleWritable dw = new DoubleWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); String course = split[0]; double sum = 0; double count = 0; double avg = 0; //把每次的成绩加起来,算出总成绩。从数组下标为2开始,所以i等于2 for (int i = 2; i < split.length; i++) { sum += Double.parseDouble(split[i]); count++; } avg = 1D* sum / count; text1.set(course); dw.set(avg); context.write(text1, dw); } } public static class Practice6Reducer extends Reducer<Text, DoubleWritable, Text, Text>{ Text text = new Text(); @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { int conut = 0; double num = 0; double avg = 0; for(DoubleWritable ff : values){ num += Double.parseDouble(ff.toString()); conut++; } avg = 1D * num / conut; String dd = conut + "\t" + avg; text.set(dd); context.write(key, text); } } }
题目二:
2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数
思路:
输出结果存储到不同的结果文件中,需要指定setNumReduceTasks,分区规则通过使用partitioner进行分区设定,平均成绩需要进行排序,可以使用封装对象的方式,通过实现WritableComparable接口进行设置排序规则
主要程序代码:
package practice6; import java.io.IOException; import java.text.DecimalFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 统计每门课程参考学生的平均分,并且按课程存入不同的结果文件, * 要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数 * @author potter * */ public class Practice7 { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // conf.set("fs.defaultFS", "hdfs://potter2:9000"); // System.setProperty("HADOOP_USER_NAME", "potter"); FileSystem fs = FileSystem.get(conf);//默认使用本地 Job job = Job.getInstance(); job.setJarByClass(Practice7.class); job.setMapperClass(Practice7Mapper.class); job.setReducerClass(Practice7Reducer.class); job.setMapOutputKeyClass(Practice7Bean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Practice7Bean.class); job.setOutputValueClass(NullWritable.class); job.setPartitionerClass(Practice7Partitioner.class);//设置分区器 job.setNumReduceTasks(4);//设置任务数目 Path input = new Path("D:\\practice\\input6\\work6.txt"); Path output = new Path("D:\\practice\\input6\\output2"); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); if (fs.exists(output)) { fs.delete(output,true); } boolean isdone = job.waitForCompletion(true); System.exit(isdone ? 0 : 1); } public static class Practice7Mapper extends Mapper<LongWritable, Text, Practice7Bean, NullWritable>{ Practice7Bean pg = new Practice7Bean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //algorithm,huangzitao,85,86,41,75,93,42,85,75 String[] split = value.toString().split(","); String course = split[0]; String name = split[1]; double sum = 0; double avg = 0; int count = 0; for (int i = 2; i < split.length; i++) { count++; sum += Double.parseDouble(split[i]); } avg = 1D * sum / count; DecimalFormat df = new DecimalFormat("0.0"); String format = df.format(avg); pg.setCourse(course); pg.setName(name); pg.setFormat(format); context.write(pg, NullWritable.get()); } } public static class Practice7Reducer extends Reducer<Practice7Bean, NullWritable,Practice7Bean, NullWritable>{ @Override protected void reduce(Practice7Bean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } }
分区器代码:
package practice6; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * 分区器 * @author potter * */ public class Practice7Partitioner extends Partitioner<Practice7Bean, NullWritable>{ @Override public int getPartition(Practice7Bean key, NullWritable value, int numPartitions) { //key现在是course + "," + name + "," + format;需要用course来分区 if (key.getCourse().equals("math")) { return 0; }else if (key.getCourse().equals("english")) { return 1; }else if (key.getCourse().equals("algorithm")) { return 2; }else{ return 3; } } }
实体类定义代码:
package practice6; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 实体类定义 * @author potter */ public class Practice7Bean implements WritableComparable<Practice7Bean>{ private String name; private String course; private String format; public Practice7Bean() { } public Practice7Bean(String name, String course, String format) { super(); this.name = name; this.course = course; this.format = format; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getCourse() { return course; } public void setCourse(String course) { this.course = course; } public String getFormat() { return format; } public void setFormat(String format) { this.format = format; } @Override public String toString() { return course + "," + name + "," + format; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeUTF(course); out.writeUTF(format); } @Override public void readFields(DataInput in) throws IOException { name = in.readUTF(); course = in.readUTF(); format = in.readUTF(); } @Override public int compareTo(Practice7Bean o) { int diff = o.format.compareTo(this.format); if (diff == 0) { return 0; }else{ return diff > 0 ? 1 : -1; } } }
输出结果:
part-r-00000输出结果:
math,huangxiaoming,83.0 math,huangjiaju,82.3 math,huanglei,74.4 math,liujialing,72.8 math,wangbaoqiang,72.4 math,xuzheng,69.3 math,liutao,56.0
part-r-00001输出结果:
english,huanglei,83.0 english,liuyifei,74.4 english,huangxiaoming,72.4 english,zhaobenshan,69.3 english,zhouqi,64.2 english,liujialing,62.1 english,liuyifei,59.6 english,huangdatou,56.0 english,huangbo,55.0
part-r-00002输出结果:
algorithm,huangjiaju,82.3 algorithm,liutao,82.0 algorithm,huanglei,74.4 algorithm,huangzitao,72.8 algorithm,liuyifei,62.1 algorithm,huangdatou,56.0
part-r-00003输出结果:
computer,huangjiaju,83.2 computer,liutao,83.0 computer,huanglei,74.4 computer,huangzitao,72.4 computer,huangbo,65.2 computer,xuzheng,65.0 computer,liujialing,64.1 computer,liuyifei,62.1 computer,huangdatou,56.0
题目三:
3、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分
解题思路:
题目涉及排序以及分组,分组使用WritableComparator,进行分组字段设置。其中需要注意的是分组字段与排序字段的关系:分组字段一定是排序字段中的前几个
举例:排序规则:a,b,c,d,e。那么分组规则就只能是以下情况中的任意一种:
a / a,b / a,b,c / a,b,c,d / a,b,c,d,e 不能跳跃
排序字段一定大于等于分组字段,并且包含分组字段
使用分组组件进行:
主要程序代码:
package practice6; import java.io.IOException; import java.text.DecimalFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分 * @author potter */ public class Practice8 { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // conf.set("fs.dedaultFS", "hdfs://potter2:9000"); // System.setProperty("HADOOP_USER_NAME", "potter"); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(); job.setJarByClass(Practice8.class); job.setMapperClass(Practice8Mapper.class); job.setReducerClass(Practice8Reducer.class); job.setMapOutputKeyClass(Practice8Bean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Practice8Bean.class); job.setOutputValueClass(NullWritable.class); //设置分组的策略,哪些key可以放置到一组中 job.setGroupingComparatorClass(Practice8Group.class);//添加分组组件 Path input = new Path("D:\\practice\\input6\\work6.txt"); Path output = new Path("D:\\practice\\input6\\output3"); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); if (fs.exists(output)) { fs.delete(output,true); } boolean isdone = job.waitForCompletion(true); System.exit(isdone ? 0 : 1); } public static class Practice8Mapper extends Mapper<LongWritable, Text, Practice8Bean, NullWritable>{ Practice8Bean pb = new Practice8Bean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); String course = split[0]; String name = split[1]; double num = 0; double avg = 0; int count = 0; for (int i = 2; i < split.length; i++) { count++; num += Double.parseDouble(split[i]); } avg = 1D* num / count; //转换小数点后一位 DecimalFormat df = new DecimalFormat("0.0"); String format = df.format(avg); pb.setCourse(course); pb.setName(name); pb.setFormat(format); context.write(pb, NullWritable.get()); } } public static class Practice8Reducer extends Reducer<Practice8Bean, NullWritable, Practice8Bean, NullWritable>{ @Override protected void reduce(Practice8Bean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(NullWritable niv : values){ count++; if (count == 1) { context.write(key, NullWritable.get()); } } } } }
实体类定义:
package practice6; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 实体类定义 * @author potter */ public class Practice8Bean implements WritableComparable<Practice8Bean>{ private String name; private String course; private String format; //必须要有一个默认的构造函数 public Practice8Bean() { } public Practice8Bean(String name, String course, String format) { super(); this.name = name; this.course = course; this.format = format; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getCourse() { return course; } public void setCourse(String course) { this.course = course; } public String getFormat() { return format; } public void setFormat(String format) { this.format = format; } @Override public String toString() { return course + "," + name + "," + format; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeUTF(course); out.writeUTF(format); } @Override public void readFields(DataInput in) throws IOException { name = in.readUTF(); course = in.readUTF(); format = in.readUTF(); } //map的键的比较就是根据这个方法来进行的 @Override public int compareTo(Practice8Bean o) { //利用这个来控制升序或降序 //this本对象写在前面代表是升序 //this本对象写在后面代表是降序 int diff = this.course.compareTo(o.course); if (diff == 0) { return o.format.compareTo(this.format); }else{ return diff > 0 ? 1 : -1; } } }
分组组件:
package practice6; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 分组组件 * @author potter * */ //主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组 public class Practice8Group extends WritableComparator{ //必须要调用父类的构造器 public Practice8Group() { super(Practice8Bean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { Practice8Bean s1 = (Practice8Bean) a; Practice8Bean s2 = (Practice8Bean) b; return s1.getCourse().compareTo(s2.getCourse()); } }