MapReduce—案例(七)学生成绩增强版

时间:2022-02-22 18:24:21

题目:

computer,huangxiaoming,85,86,41,75,93,42,85
computer,xuzheng,54,52,86,91,42
computer,huangbo,85,42,96,38
english,zhaobenshan,54,52,86,91,42,85,75
english,liuyifei,85,41,75,21,85,96,14
algorithm,liuyifei,75,85,62,48,54,96,15
computer,huangjiaju,85,75,86,85,85
english,liuyifei,76,95,86,74,68,74,48
english,huangdatou,48,58,67,86,15,33,85
algorithm,huanglei,76,95,86,74,68,74,48
algorithm,huangjiaju,85,75,86,85,85,74,86
computer,huangdatou,48,58,67,86,15,33,85
english,zhouqi,85,86,41,75,93,42,85,75,55,47,22
english,huangbo,85,42,96,38,55,47,22
algorithm,liutao,85,75,85,99,66
computer,huangzitao,85,86,41,75,93,42,85
math,wangbaoqiang,85,86,41,75,93,42,85
computer,liujialing,85,41,75,21,85,96,14,74,86
computer,liuyifei,75,85,62,48,54,96,15
computer,liutao,85,75,85,99,66,88,75,91
computer,huanglei,76,95,86,74,68,74,48
english,liujialing,75,85,62,48,54,96,15
math,huanglei,76,95,86,74,68,74,48
math,huangjiaju,85,75,86,85,85,74,86
math,liutao,48,58,67,86,15,33,85
english,huanglei,85,75,85,99,66,88,75,91
math,xuzheng,54,52,86,91,42,85,75
math,huangxiaoming,85,75,85,99,66,88,75,91
math,liujialing,85,86,41,75,93,42,85,75
english,huangxiaoming,85,86,41,75,93,42,85
algorithm,huangdatou,48,58,67,86,15,33,85
algorithm,huangzitao,85,86,41,75,93,42,85,75

一、数据解释

数据字段个数不固定:
第一个是课程名称,总共四个课程,computer,math,english,algorithm,
第二个是学生姓名,后面是每次考试的分数

二、统计需求:
1、统计每门课程的参考人数和课程平均分

2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数

3、求出每门课程参考学生成绩最高的学生的信息:课程,和平均分

要求:把课程,姓名和平均分这三个字段封装成一个自定义对象,当做key, 求出结果
4、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分
	

题目一:

1、统计每门课程的参考人数和课程平均分

思路:

以课程为key,在map中求出每个学生的平均成绩,然后在reduce中求出整体的课程平均成绩。

代码:

package practice6;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 需求:统计每门课程的参考人数和课程平均分
 * @author potter
 *
 */
public class Practice6 {

	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
//		conf.set("fs.defaultFS", "hdfs://potter2:9000");
//		System.setProperty("HADOOP_USER_NAME", "potter");
		FileSystem fs = FileSystem.get(conf);
		
		Job job = Job.getInstance();
		job.setJarByClass(Practice6.class);
		job.setMapperClass(Practice6Mapper.class);
		job.setReducerClass(Practice6Reducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(DoubleWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		Path input = new Path("D:\\practice\\input6\\work6.txt");
		Path output = new Path("D:\\practice\\input6\\output1");
		FileInputFormat.setInputPaths(job, input);
		FileOutputFormat.setOutputPath(job, output);
		
		if (fs.exists(output)) {
			fs.delete(output,true);
		}
		boolean isdone = job.waitForCompletion(true);
		System.exit(isdone ? 0 : 1);
		
	}
	public static class Practice6Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{
		/**
		 * computer,huangxiaoming,85,86,41,75,93,42,85
		 * computer,xuzheng,54,52,86,91,42
		 * 统计每门课程的参考人数和课程平均分
		 */
		Text text1 = new Text();
		DoubleWritable dw = new DoubleWritable();
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split(",");
			String course = split[0];
			double sum = 0;
			double count = 0;
			double avg = 0;
			//把每次的成绩加起来,算出总成绩。从数组下标为2开始,所以i等于2
			for (int i = 2; i < split.length; i++) {
				sum += Double.parseDouble(split[i]);
				count++;
			}
			avg = 1D* sum / count;
			text1.set(course);
			dw.set(avg);
			context.write(text1, dw);
		}
	}
	public static class Practice6Reducer extends Reducer<Text, DoubleWritable, Text, Text>{
		Text text = new Text();
		@Override
		protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
				throws IOException, InterruptedException {
			int conut = 0;
			double num = 0;
			double avg = 0;
			for(DoubleWritable ff : values){
				 num += Double.parseDouble(ff.toString());
				conut++;
			}
			avg = 1D * num / conut;
			String dd = conut + "\t" + avg;
			text.set(dd);
			context.write(key, text);
		}
	}
}

题目二:

2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数

思路:

输出结果存储到不同的结果文件中,需要指定setNumReduceTasks,分区规则通过使用partitioner进行分区设定,平均成绩需要进行排序,可以使用封装对象的方式,通过实现WritableComparable接口进行设置排序规则

主要程序代码:

package practice6;

import java.io.IOException;
import java.text.DecimalFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,
 * 要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数
 * @author potter
 *
 */
public class Practice7 {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
//		conf.set("fs.defaultFS", "hdfs://potter2:9000");
//		System.setProperty("HADOOP_USER_NAME", "potter");
		FileSystem fs = FileSystem.get(conf);//默认使用本地 
		
		Job job = Job.getInstance();
		job.setJarByClass(Practice7.class);
		job.setMapperClass(Practice7Mapper.class);
		job.setReducerClass(Practice7Reducer.class);
		job.setMapOutputKeyClass(Practice7Bean.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setOutputKeyClass(Practice7Bean.class);
		job.setOutputValueClass(NullWritable.class);
		
		job.setPartitionerClass(Practice7Partitioner.class);//设置分区器
		job.setNumReduceTasks(4);//设置任务数目
		
		Path input = new Path("D:\\practice\\input6\\work6.txt");
		Path output = new Path("D:\\practice\\input6\\output2");
		
		FileInputFormat.setInputPaths(job, input);
		FileOutputFormat.setOutputPath(job, output);
		
		if (fs.exists(output)) {
			fs.delete(output,true);
		}
		boolean isdone = job.waitForCompletion(true);
		System.exit(isdone ? 0 : 1);
		
	}
	public static class Practice7Mapper extends Mapper<LongWritable, Text, Practice7Bean, NullWritable>{
		Practice7Bean pg = new Practice7Bean();
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			//algorithm,huangzitao,85,86,41,75,93,42,85,75
			String[] split = value.toString().split(",");
			String course = split[0];
			String name = split[1];
			double sum = 0;
			double avg = 0;
			int count = 0;
			for (int i = 2; i < split.length; i++) {
				count++;
				sum += Double.parseDouble(split[i]);
				
			}
			avg = 1D * sum / count;
			DecimalFormat df = new DecimalFormat("0.0");
			String format = df.format(avg);
			pg.setCourse(course);
			pg.setName(name);
			pg.setFormat(format);
			context.write(pg, NullWritable.get());
		}
	}
	public static class Practice7Reducer extends Reducer<Practice7Bean, NullWritable,Practice7Bean, NullWritable>{
		@Override
		protected void reduce(Practice7Bean key, Iterable<NullWritable> values, Context context)
				throws IOException, InterruptedException {
			
				context.write(key, NullWritable.get());
			
		}
	}
}

分区器代码:

package practice6;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * 分区器
 * @author potter
 *
 */
public class Practice7Partitioner extends Partitioner<Practice7Bean, NullWritable>{

	@Override
	public int getPartition(Practice7Bean key, NullWritable value, int numPartitions) {
		//key现在是course + "," + name + "," + format;需要用course来分区
		if (key.getCourse().equals("math")) {
			return 0;
		}else if (key.getCourse().equals("english")) {
			return 1;
		}else if (key.getCourse().equals("algorithm")) {
			return 2;
		}else{
			return 3;
		}
	}
}

实体类定义代码:

package practice6;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
/**
 * 实体类定义
 * @author potter
 */
public class Practice7Bean implements WritableComparable<Practice7Bean>{

	private String name;
	private String course;
	private String format;
	public Practice7Bean() {
	}
	public Practice7Bean(String name, String course, String format) {
		super();
		this.name = name;
		this.course = course;
		this.format = format;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getCourse() {
		return course;
	}
	public void setCourse(String course) {
		this.course = course;
	}
	public String getFormat() {
		return format;
	}
	public void setFormat(String format) {
		this.format = format;
	}
	@Override
	public String toString() {
		return course + "," + name + "," + format;
	}
	@Override
	public void write(DataOutput out) throws IOException {

		out.writeUTF(name);
		out.writeUTF(course);
		out.writeUTF(format);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		
		name = in.readUTF();
		course = in.readUTF();
		format = in.readUTF();
		
	}
	@Override
	public int compareTo(Practice7Bean o) {
		
		int diff =  o.format.compareTo(this.format);
		
		if (diff == 0) {
			return 0;
		}else{
			return diff > 0 ? 1 : -1;
		}
		 
		
		
	}
}

输出结果:

part-r-00000输出结果:

math,huangxiaoming,83.0
math,huangjiaju,82.3
math,huanglei,74.4
math,liujialing,72.8
math,wangbaoqiang,72.4
math,xuzheng,69.3
math,liutao,56.0

part-r-00001输出结果:

english,huanglei,83.0
english,liuyifei,74.4
english,huangxiaoming,72.4
english,zhaobenshan,69.3
english,zhouqi,64.2
english,liujialing,62.1
english,liuyifei,59.6
english,huangdatou,56.0
english,huangbo,55.0

part-r-00002输出结果:

algorithm,huangjiaju,82.3
algorithm,liutao,82.0
algorithm,huanglei,74.4
algorithm,huangzitao,72.8
algorithm,liuyifei,62.1
algorithm,huangdatou,56.0

part-r-00003输出结果:

computer,huangjiaju,83.2
computer,liutao,83.0
computer,huanglei,74.4
computer,huangzitao,72.4
computer,huangbo,65.2
computer,xuzheng,65.0
computer,liujialing,64.1
computer,liuyifei,62.1
computer,huangdatou,56.0

题目三:

3、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分

解题思路:

题目涉及排序以及分组,分组使用WritableComparator,进行分组字段设置。其中需要注意的是分组字段与排序字段的关系:分组字段一定是排序字段中的前几个

举例:排序规则:a,b,c,d,e。那么分组规则就只能是以下情况中的任意一种:

a   /    a,b    /  a,b,c    / a,b,c,d    /    a,b,c,d,e    不能跳跃

排序字段一定大于等于分组字段,并且包含分组字段

使用分组组件进行:

主要程序代码:

package practice6;

import java.io.IOException;
import java.text.DecimalFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分
 * @author potter
 */
public class Practice8 {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
//		conf.set("fs.dedaultFS", "hdfs://potter2:9000");
//		System.setProperty("HADOOP_USER_NAME", "potter");
		FileSystem fs = FileSystem.get(conf);
		
		Job job = Job.getInstance();
		job.setJarByClass(Practice8.class);
		job.setMapperClass(Practice8Mapper.class);
		job.setReducerClass(Practice8Reducer.class);
		job.setMapOutputKeyClass(Practice8Bean.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setOutputKeyClass(Practice8Bean.class);
		job.setOutputValueClass(NullWritable.class);

		//设置分组的策略,哪些key可以放置到一组中
		job.setGroupingComparatorClass(Practice8Group.class);//添加分组组件
		
		Path input = new Path("D:\\practice\\input6\\work6.txt");
		Path output = new Path("D:\\practice\\input6\\output3");
		
		FileInputFormat.setInputPaths(job, input);
		FileOutputFormat.setOutputPath(job, output);
		
		if (fs.exists(output)) {
			fs.delete(output,true);
		}
		boolean isdone = job.waitForCompletion(true);
		System.exit(isdone ? 0 : 1);
	}
	public static class Practice8Mapper extends Mapper<LongWritable, Text, Practice8Bean, NullWritable>{
		Practice8Bean pb = new Practice8Bean();
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split(",");
			String course = split[0];
			String name = split[1];
			double num = 0;
			double avg = 0;
			int count = 0;
			for (int i = 2; i < split.length; i++) {
				count++;
				num += Double.parseDouble(split[i]);
			}
			avg = 1D* num / count;
			//转换小数点后一位
			DecimalFormat df = new DecimalFormat("0.0");
			String format = df.format(avg);
			pb.setCourse(course);
			pb.setName(name);
			pb.setFormat(format);
			context.write(pb, NullWritable.get());
		}
	}
	public static class Practice8Reducer extends Reducer<Practice8Bean, NullWritable, Practice8Bean, NullWritable>{
		@Override
		protected void reduce(Practice8Bean key, Iterable<NullWritable> values, Context context)
				throws IOException, InterruptedException {
			int count = 0;
			for(NullWritable niv : values){
				count++;
				if (count == 1) {
					context.write(key, NullWritable.get());
					
				}
			}
		}
	}
}

实体类定义:

package practice6;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
/**
 * 实体类定义
 * @author potter
 */
public class Practice8Bean implements WritableComparable<Practice8Bean>{

	private String name;
	private String course;
	private String format;
	//必须要有一个默认的构造函数
	public Practice8Bean() {
	}
	public Practice8Bean(String name, String course, String format) {
		super();
		this.name = name;
		this.course = course;
		this.format = format;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getCourse() {
		return course;
	}
	public void setCourse(String course) {
		this.course = course;
	}
	public String getFormat() {
		return format;
	}
	public void setFormat(String format) {
		this.format = format;
	}
	@Override
	public String toString() {
		return course + "," + name + "," + format;
	}
	@Override
	public void write(DataOutput out) throws IOException {

		out.writeUTF(name);
		out.writeUTF(course);
		out.writeUTF(format);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		
		name = in.readUTF();
		course = in.readUTF();
		format = in.readUTF();
		
	}
	//map的键的比较就是根据这个方法来进行的
	@Override
	public int compareTo(Practice8Bean o) {
		//利用这个来控制升序或降序
        //this本对象写在前面代表是升序
        //this本对象写在后面代表是降序
		int diff = this.course.compareTo(o.course);  
        if (diff == 0) {  
              
            return o.format.compareTo(this.format);  
        }else{  
            return diff > 0 ? 1 : -1;  
        }  
		
		
	}
}

分组组件:

package practice6;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 分组组件
 * @author potter
 *
 */
//主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组
public class Practice8Group extends WritableComparator{

	//必须要调用父类的构造器
	public Practice8Group() {
		super(Practice8Bean.class,true);
	}
	
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		
		Practice8Bean s1 = (Practice8Bean) a;
		Practice8Bean s2 = (Practice8Bean) b;
		
		return s1.getCourse().compareTo(s2.getCourse());
	}
}