前言
在Hadoop中,排序是MapReduce的灵魂,MapTask和ReduceTask均会对数据按Key排序,这个操作是MR框架的默认行为,不管你的业务逻辑上是否需要这一操作。
技术点
MapReduce框架中,用到的排序主要有两种:快速排序和基于堆实现的优先级队列(PriorityQueue)。
Mapper阶段
从map输出到环形缓冲区的数据会被排序(这是MR框架中改良的快速排序),这个排序涉及partition和key,当缓冲区容量占用80%,会spill数据到磁盘,生成IFile文件,Map结束后,会将IFile文件排序合并成一个大文件(基于堆实现的优先级队列),以供不同的reduce来拉取相应的数据。
Reducer阶段
从Mapper端取回的数据已是部分有序,Reduce Task只需进行一次归并排序即可保证数据整体有序。为了提高效率,Hadoop将sort阶段和reduce阶段并行化,在sort阶段,Reduce Task为内存和磁盘中的文件建立了小顶堆,保存了指向该小顶堆根节点的迭代器,并不断的移动迭代器,以将key相同的数据顺次交给reduce()函数处理,期间移动迭代器的过程实际上就是不断调整小顶堆的过程(建堆→取堆顶元素→重新建堆→取堆顶元素...),这样,sort和reduce可以并行进行。
分组Top N分析
在数据处理中,经常会碰到这样一个场景,对表数据按照某一字段分组,然后找出各自组内最大的几条记录情形。针对这种分组Top N问题,我们利用Hive、MapReduce等多种工具实现一下。
场景模拟
computer,huangxiaoming,85,86,41,75,93,42,85 computer,xuzheng,54,52,86,91,42 computer,huangbo,85,42,96,38 english,zhaobenshan,54,52,86,91,42,85,75 english,liuyifei,85,41,75,21,85,96,14 algorithm,liuyifei,75,85,62,48,54,96,15 computer,huangjiaju,85,75,86,85,85 english,liuyifei,76,95,86,74,68,74,48 english,huangdatou,48,58,67,86,15,33,85 algorithm,huanglei,76,95,86,74,68,74,48 algorithm,huangjiaju,85,75,86,85,85,74,86 computer,huangdatou,48,58,67,86,15,33,85 english,zhouqi,85,86,41,75,93,42,85,75,55,47,22 english,huangbo,85,42,96,38,55,47,22 algorithm,liutao,85,75,85,99,66 computer,huangzitao,85,86,41,75,93,42,85 math,wangbaoqiang,85,86,41,75,93,42,85 computer,liujialing,85,41,75,21,85,96,14,74,86 computer,liuyifei,75,85,62,48,54,96,15 computer,liutao,85,75,85,99,66,88,75,91 computer,huanglei,76,95,86,74,68,74,48 english,liujialing,75,85,62,48,54,96,15 math,huanglei,76,95,86,74,68,74,48 math,huangjiaju,85,75,86,85,85,74,86 math,liutao,48,58,67,86,15,33,85 english,huanglei,85,75,85,99,66,88,75,91 math,xuzheng,54,52,86,91,42,85,75 math,huangxiaoming,85,75,85,99,66,88,75,91 math,liujialing,85,86,41,75,93,42,85,75 english,huangxiaoming,85,86,41,75,93,42,85 algorithm,huangdatou,48,58,67,86,15,33,85 algorithm,huangzitao,85,86,41,75,93,42,85,75
一、数据解释
数据字段个数不固定:
第一个是课程名称,总共四个课程,computer,math,english,algorithm,
第二个是学生姓名,后面是每次考试的分数
二、统计需求:
1、统计每门课程的参考人数和课程平均分
2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数
3、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分
第一题
CourseScoreMR1.java
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.DoubleWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 15 public class CourseScoreMR1 { 16 17 public static void main(String[] args) throws Exception { 18 19 Configuration conf = new Configuration(); 20 FileSystem fs = FileSystem.get(conf); 21 Job job = Job.getInstance(conf); 22 23 24 job.setJarByClass(CourseScoreMR1.class); 25 job.setMapperClass(CourseScoreMR1Mapper.class); 26 job.setReducerClass(CourseScoreMR1Reducer.class); 27 28 job.setMapOutputKeyClass(Text.class); 29 job.setMapOutputValueClass(DoubleWritable.class); 30 job.setOutputKeyClass(Text.class); 31 job.setOutputValueClass(Text.class); 32 33 34 Path inputPath = new Path("E:\\bigdata\\cs\\input"); 35 Path outputPath = new Path("E:\\bigdata\\cs\\output_1"); 36 FileInputFormat.setInputPaths(job, inputPath); 37 if(fs.exists(outputPath)){ 38 fs.delete(outputPath, true); 39 } 40 FileOutputFormat.setOutputPath(job, outputPath); 41 42 43 boolean isDone = job.waitForCompletion(true); 44 System.exit(isDone ? 0 : 1); 45 } 46 47 public static class CourseScoreMR1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{ 48 49 /** 50 * 数据的三个字段: course , name, score 51 * 52 * value == algorithm,huangzitao,85,86,41,75,93,42,85,75 53 * 54 * 输出的key和value: 55 * 56 * key : course 57 * 58 * value : avgScore 59 * 60 * 格式化数值相关的操作的API : NumberFormat 61 * SimpleDateFormat 62 */ 63 64 Text outKey = new Text(); 65 DoubleWritable outValue = new DoubleWritable(); 66 67 @Override 68 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 69 70 String[] split = value.toString().split(","); 71 72 String course = split[0]; 73 74 int sum = 0; 75 int count = 0; 76 77 for(int i = 2; i<split.length; i++){ 78 int tempScore = Integer.parseInt(split[i]); 79 sum += tempScore; 80 81 count++; 82 } 83 84 double avgScore = 1D * sum / count; 85 86 87 outKey.set(course); 88 outValue.set(avgScore); 89 90 context.write(outKey, outValue); 91 } 92 93 } 94 95 public static class CourseScoreMR1Reducer extends Reducer<Text, DoubleWritable, Text, Text>{ 96 97 98 Text outValue = new Text(); 99 /** 100 * key : course 101 * 102 * values : 98.7 87.6 103 */ 104 @Override 105 protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { 106 107 double sum = 0; 108 int count = 0; 109 110 for(DoubleWritable dw : values){ 111 sum += dw.get(); 112 count ++; 113 } 114 115 double lastAvgScore = sum / count; 116 117 outValue.set(count+"\t" + lastAvgScore); 118 119 context.write(key, outValue); 120 } 121 } 122 }
第二题
CourseScoreMR2.java
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.NullWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 15 import com.ghgj.mr.exercise.pojo.CourseScore; 16 import com.ghgj.mr.exercise.ptn.CSPartitioner; 17 18 public class CourseScoreMR2{ 19 20 public static void main(String[] args) throws Exception { 21 22 Configuration conf = new Configuration(); 23 24 FileSystem fs = FileSystem.get(conf); 25 Job job = Job.getInstance(conf); 26 27 28 job.setJarByClass(CourseScoreMR2.class); 29 job.setMapperClass(CourseScoreMR2Mapper.class); 30 // job.setReducerClass(CourseScoreMR2Reducer.class); 31 32 job.setMapOutputKeyClass(CourseScore.class); 33 job.setMapOutputValueClass(NullWritable.class); 34 // job.setOutputKeyClass(CourseScore.class); 35 // job.setOutputValueClass(NullWritable.class); 36 37 38 job.setPartitionerClass(CSPartitioner.class); 39 job.setNumReduceTasks(4); 40 41 42 Path inputPath = new Path("E:\\bigdata\\cs\\input"); 43 Path outputPath = new Path("E:\\bigdata\\cs\\output_2"); 44 FileInputFormat.setInputPaths(job, inputPath); 45 if(fs.exists(outputPath)){ 46 fs.delete(outputPath, true); 47 } 48 FileOutputFormat.setOutputPath(job, outputPath); 49 50 51 boolean isDone = job.waitForCompletion(true); 52 System.exit(isDone ? 0 : 1); 53 } 54 55 public static class CourseScoreMR2Mapper extends Mapper<LongWritable, Text, CourseScore, NullWritable>{ 56 57 CourseScore cs = new CourseScore(); 58 59 /** 60 * value = math,huangxiaoming,85,75,85,99,66,88,75,91 61 */ 62 @Override 63 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 64 65 String[] split = value.toString().split(","); 66 67 String course = split[0]; 68 String name = split[1]; 69 70 int sum = 0; 71 int count = 0; 72 73 for(int i = 2; i<split.length; i++){ 74 int tempScore = Integer.parseInt(split[i]); 75 sum += tempScore; 76 77 count++; 78 } 79 80 double avgScore = 1D * sum / count; 81 82 cs.setCourse(course); 83 cs.setName(name); 84 cs.setScore(avgScore); 85 86 context.write(cs, NullWritable.get()); 87 } 88 89 } 90 91 public static class CourseScoreMR2Reducer extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable>{ 92 93 @Override 94 protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { 95 96 97 } 98 } 99 }
CSPartitioner.java
1 import org.apache.hadoop.io.NullWritable; 2 import org.apache.hadoop.mapreduce.Partitioner; 3 4 import com.ghgj.mr.exercise.pojo.CourseScore; 5 6 public class CSPartitioner extends Partitioner<CourseScore,NullWritable>{ 7 8 /** 9 * 10 */ 11 @Override 12 public int getPartition(CourseScore key, NullWritable value, int numPartitions) { 13 14 String course = key.getCourse(); 15 if(course.equals("math")){ 16 return 0; 17 }else if(course.equals("english")){ 18 return 1; 19 }else if(course.equals("computer")){ 20 return 2; 21 }else{ 22 return 3; 23 } 24 25 26 } 27 28 29 }
第三题
CourseScoreMR3.java
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.NullWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 15 import com.ghgj.mr.exercise.gc.CourseScoreGC; 16 import com.ghgj.mr.exercise.pojo.CourseScore; 17 18 public class CourseScoreMR3{ 19 20 private static final int TOPN = 3; 21 22 public static void main(String[] args) throws Exception { 23 24 Configuration conf = new Configuration(); 25 FileSystem fs = FileSystem.get(conf); 26 Job job = Job.getInstance(conf); 27 28 29 job.setJarByClass(CourseScoreMR3.class); 30 job.setMapperClass(CourseScoreMR2Mapper.class); 31 job.setReducerClass(CourseScoreMR2Reducer.class); 32 33 job.setMapOutputKeyClass(CourseScore.class); 34 job.setMapOutputValueClass(NullWritable.class); 35 job.setOutputKeyClass(CourseScore.class); 36 job.setOutputValueClass(NullWritable.class); 37 38 39 // job.setPartitionerClass(CSPartitioner.class); 40 // job.setNumReduceTasks(4); 41 42 43 // 指定分组规则 44 job.setGroupingComparatorClass(CourseScoreGC.class); 45 46 47 Path inputPath = new Path("E:\\bigdata\\cs\\input"); 48 Path outputPath = new Path("E:\\bigdata\\cs\\output_3_last"); 49 FileInputFormat.setInputPaths(job, inputPath); 50 if(fs.exists(outputPath)){ 51 fs.delete(outputPath, true); 52 } 53 FileOutputFormat.setOutputPath(job, outputPath); 54 55 56 boolean isDone = job.waitForCompletion(true); 57 System.exit(isDone ? 0 : 1); 58 } 59 60 public static class CourseScoreMR2Mapper extends Mapper<LongWritable, Text, CourseScore, NullWritable>{ 61 62 CourseScore cs = new CourseScore(); 63 64 /** 65 * value = math,huangxiaoming,85,75,85,99,66,88,75,91 66 */ 67 @Override 68 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 69 70 String[] split = value.toString().split(","); 71 72 String course = split[0]; 73 String name = split[1]; 74 75 int sum = 0; 76 int count = 0; 77 78 for(int i = 2; i<split.length; i++){ 79 int tempScore = Integer.parseInt(split[i]); 80 sum += tempScore; 81 82 count++; 83 } 84 85 double avgScore = 1D * sum / count; 86 87 cs.setCourse(course); 88 cs.setName(name); 89 cs.setScore(avgScore); 90 91 context.write(cs, NullWritable.get()); 92 } 93 94 } 95 96 public static class CourseScoreMR2Reducer extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable>{ 97 98 int count = 0; 99 100 /** 101 * reducer阶段的reduce方法的调用参数:key相同的额一组key-vlaue 102 * 103 * redcuer阶段,每次遇到一个不同的key的key_value组, 那么reduce方法就会被调用一次。 104 * 105 * 106 * values这个迭代器只能迭代一次。 107 * values迭代器在迭代的过程中迭代出来的value会变,同时,这个value所对应的key也会跟着变 合理 108 * 109 */ 110 @Override 111 protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { 112 113 114 int count = 0; 115 116 for(NullWritable nvl : values){ 117 System.out.println("*********************************** " + (++count) + " " + key.toString()); 118 119 if(count == 3){ 120 return; 121 } 122 } 123 124 125 // 原样输出 126 /*for(NullWritable nvl : values){ 127 context.write(key, nvl); 128 }*/ 129 130 131 // 输出每门课程的最高分数 , 预期结果中,key的显示都是一样的 132 // for(NullWritable nvl : values){ 133 // System.out.println(key + " - " nvl); 134 // 135 // valueList.add(nvl); 136 // } 137 138 // List<Value> valueList = null; 139 // 预期结果中,key的显示都是一样的 140 /*int count = 0; 141 for(NullWritable nvl : values){ 142 count++; 143 } 144 for(int i = 0; i<count; i++){ 145 valueList.get(i) = value 146 System.out.println(key + " - "+ value); 147 }*/ 148 149 150 // math hello 1 151 // math hi 2 152 } 153 } 154 }
CourseScoreGC.java
1 import org.apache.hadoop.io.WritableComparable; 2 import org.apache.hadoop.io.WritableComparator; 3 4 import com.ghgj.mr.exercise.pojo.CourseScore; 5 6 /** 7 * 分组规则的指定 8 */ 9 public class CourseScoreGC extends WritableComparator{ 10 11 public CourseScoreGC(){ 12 super(CourseScore.class, true); 13 } 14 15 /** 16 * 17 * 方法的定义解释: 18 * 19 * 方法的意义:一般来说,都可以从方法名找到一些提示 20 * 方法的参数:将来你的MR程序中,要作为key的两个对象,是否是相同的对象 21 * 方法的返回值: 返回值类型为int 当返回值为0的时候。证明, 两个参数对象,经过比较之后,是同一个对象 22 * 23 * 在我们的需求中: 分组规则是 Course 24 * 25 */ 26 @Override 27 public int compare(WritableComparable a, WritableComparable b) { 28 29 CourseScore cs1 = (CourseScore)a; 30 CourseScore cs2 = (CourseScore)b; 31 32 int compareTo = cs1.getCourse().compareTo(cs2.getCourse()); 33 34 return compareTo; 35 } 36 }