Hadoop基础-Map端链式编程之MapReduce统计TopN示例

时间:2023-01-15 20:14:59

         Hadoop基础-Map端链式编程之MapReduce统计TopN示例

                                    作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

一.项目需求

  对“temp.txt”中的数据进行分析,统计出各个年份(第15~19列)总排行前十的最高气温(第87~92列),由于博客园无法上传大文件的文本,因此我把该文本的内容放在博客园的另一个链接了(需要的戳我)。,如果网页打不开的话也就可以去百度云盘里下载副本,链接:链接:https://pan.baidu.com/s/12aZFcO2XoegUGMAbS--n6Q 密码:7n91。

Hadoop基础-Map端链式编程之MapReduce统计TopN示例

二.代码实现

 /*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
EMAIL:y1053419035@qq.com
*/
package cn.org.yinzhengjie.mrchain; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; public class CompKey implements WritableComparable<CompKey> { private String year;
private int temp;
/**
* 重写CompKey对年份和气温排序
*/
public int compareTo(CompKey o) {
if(this.getYear().equals(o.getYear())){
return o.getTemp() - this.getTemp();
}
return this.getYear().compareTo(o.getYear()); } public void write(DataOutput out) throws IOException {
out.writeUTF(year);
out.writeInt(temp); } public void readFields(DataInput in) throws IOException {
year = in.readUTF();
temp = in.readInt(); } public String getYear() {
return year;
} public void setYear(String year) {
this.year = year;
} public int getTemp() {
return temp;
} public void setTemp(int temp) {
this.temp = temp;
} @Override
public String toString() {
return year + '\t' +temp ;
}
}

CompKey.java 文件内容

 /*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
EMAIL:y1053419035@qq.com
*/
package cn.org.yinzhengjie.mrchain; import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator; public class MyGroupComparator extends WritableComparator { public MyGroupComparator() {
super(CompKey.class,true);
} public int compare(WritableComparable a, WritableComparable b) {
CompKey ck1 = (CompKey) a;
CompKey ck2 = (CompKey) b;
return ck1.getYear().compareTo(ck2.getYear());
}
}

MyGroupComparator.java 文件内容

 /*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
EMAIL:y1053419035@qq.com
*/
package cn.org.yinzhengjie.mrchain; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ChainMapper1 extends Mapper<LongWritable, Text, Text, IntWritable> { @Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //得到年份
String year = line.substring(15, 19); //得到气温
int temp = Integer.parseInt(line.substring(87, 92)); context.write(new Text(year), new IntWritable(temp)); }
}

ChainMapper1.java 文件内容

 /*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
EMAIL:y1053419035@qq.com
*/
package cn.org.yinzhengjie.mrchain; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ChainMapper2 extends Mapper<Text,IntWritable,CompKey,NullWritable> { @Override
protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { int i = value.get(); if( i != 9999){
CompKey ck = new CompKey();
ck.setYear(key.toString());
ck.setTemp(i);
context.write(ck,NullWritable.get());
}
}
}

ChainMapper2.java 文件内容

 /*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
EMAIL:y1053419035@qq.com
*/
package cn.org.yinzhengjie.mrchain; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException;
import java.util.Iterator; public class ChainReducer1 extends Reducer<CompKey, NullWritable, Text, IntWritable> { //由于分组对比器设定,相同的year放在一个分组,因此,在一个reduce循环中,得到的数据均为同一年份的数据
protected void reduce(CompKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
String year = key.getYear();
Iterator<NullWritable> it = values.iterator();
int i = 0;
while (it.hasNext()){
System.out.println(key.toString());
int temp = key.getTemp();
context.write(new Text(year), new IntWritable(temp));
it.next();
i++;
if(i >= 10){
break;
}
}
}
}

ChainReducer1.java 文件内容

 /*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
EMAIL:y1053419035@qq.com
*/
package cn.org.yinzhengjie.mrchain; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ChainReducer2 extends Mapper<Text, IntWritable, Text,IntWritable> {
protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
int temp = value.get();
//取得奇数气温
if( temp % 2 == 1 ){
context.write(key, new IntWritable(temp));
} }
}

ChainReducer2.java 文件内容

 /*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
EMAIL:y1053419035@qq.com
*/
package cn.org.yinzhengjie.mrchain; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ChainApp { public static void main(String[] args) throws Exception { Configuration conf = new Configuration();
conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); FileSystem fs = FileSystem.get(conf); job.setJobName("Chain"); job.setJarByClass(ChainApp.class);
//在MapChain中,一个Map后面可以跟n多Map
ChainMapper.addMapper(job,ChainMapper1.class,LongWritable.class, Text.class,
Text.class, IntWritable.class,conf); ChainMapper.addMapper(job,ChainMapper2.class,Text.class,IntWritable.class,
CompKey.class,NullWritable.class,conf); //在ReduceChain中,一个Reduce中不能跟reduce,只能跟map
ChainReducer.setReducer(job,ChainReducer1.class,CompKey.class,NullWritable.class,
Text.class,IntWritable.class,conf); ChainReducer.addMapper(job,ChainReducer2.class, Text.class, IntWritable.class,
Text.class,IntWritable.class, conf); job.setGroupingComparatorClass(MyGroupComparator.class); //判断是否存在,如果存在则删除
Path outPath = new Path("D:\\10.Java\\IDE\\yhinzhengjieData\\MyHadoop\\out");
if(fs.exists(outPath)){
fs.delete(outPath,true);
} //输入路径
FileInputFormat.addInputPath(job,new Path("D:\\10.Java\\IDE\\yhinzhengjieData\\MyHadoop\\temp")); //输出路径
FileOutputFormat.setOutputPath(job,outPath); job.waitForCompletion(true);
}
}