项目介绍
本项目我们使用明星搜索指数数据,分别统计出搜索指数最高的男明星和女明星。
思路分析
基于项目的需求,我们通过以下几步完成:
1、编写 Mapper类,按需求将数据集解析为 key=gender,value=name+hotIndex,然后
输出。
2、编写 Combiner 类,合并 Mapper 输出结果,然后输出给 Reducer。
3、编写 Partitioner 类,按性别,将结果指定给不同的 Reduce 执行。
4、编写 Reducer 类,分别统计出男、女明星的最高搜索指数。
5、编写 run 方法执行 MapReduce 任务。
数据格式
代码
package com.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * * 统计分别统计出男女明星最大搜索指数 */ public class Star extends Configured implements Tool { /** * Mapper 解析明星数据 * @input key=偏移量 value=明星数据 * @output key=gender value=name+hotIndex */ public static class ActorMapper extends Mapper<Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //value=name+gender+hotIndex String[] tokens = value.toString().split("\t");//使用分隔符\t,将数据解析为数组 tokens String gender = tokens[1].trim();//性别 String nameHotIndex = tokens[0] + "\t" + tokens[2];//名称和关注指数 //输出key=gender value=name+hotIndex context.write(new Text(gender), new Text(nameHotIndex)); } } /** * * Partitioner 根据sex选择分区 * */ public static class ActorPartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numReduceTasks) { String sex = key.toString();//按性别分区 // 默认指定分区 0 if(numReduceTasks==0) return 0; //性别为male 选择分区0 if(sex.equals("male")) return 0; //性别为female 选择分区1 if(sex.equals("female")) return 1 % numReduceTasks; //其他性别 选择分区2 else return 2 % numReduceTasks; } } /** * * 定义Combiner 合并 Mapper 输出结果 * */ public static class ActorCombiner extends Reducer<Text, Text, Text, Text> { private Text text = new Text(); @Override public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { int maxHotIndex = Integer.MIN_VALUE; int hotIndex = 0; String name=""; for (Text val : values) { String[] valTokens = val.toString().split("\\t"); hotIndex = Integer.parseInt(valTokens[1]); if(hotIndex>maxHotIndex){ name = valTokens[0]; maxHotIndex = hotIndex; } } text.set(name+"\t"+maxHotIndex); context.write(key, text); } } /** * * Reducer 统计男、女明星最高搜索指数 * @input key=gender value=name+hotIndex * @output key=name value=gender+hotIndex(max) */ public static class ActorReducer extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { int maxHotIndex = Integer.MIN_VALUE; String name = " "; int hotIndex = 0; // 根据key,迭代 values 集合,求出最高搜索指数 for (Text val : values) { String[] valTokens = val.toString().split("\\t"); hotIndex = Integer.parseInt(valTokens[1]); if (hotIndex > maxHotIndex) { name = valTokens[0]; maxHotIndex = hotIndex; } } context.write(new Text(name), new Text( key + "\t"+ maxHotIndex)); } } /** * 任务驱动方法 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration();//读取配置文件 Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = new Job(conf, "star");//新建一个任务 job.setJarByClass(Star.class);//主类 job.setNumReduceTasks(2);//reduce的个数设置为2 job.setPartitionerClass(ActorPartitioner.class);//设置Partitioner类 job.setMapperClass(ActorMapper.class);//Mapper job.setMapOutputKeyClass(Text.class);//map 输出key类型 job.setMapOutputValueClass(Text.class);//map 输出value类型 job.setCombinerClass(ActorCombiner.class);//设置Combiner类 job.setReducerClass(ActorReducer.class);//Reducer job.setOutputKeyClass(Text.class);//输出结果 key类型 job.setOutputValueClass(Text.class);//输出结果 value类型 FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径 job.waitForCompletion(true);//提交任务 return 0; } /** * main 方法 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { String[] args0 = { "hdfs://cdh001:9000/actor/actor.txt", "hdfs://cdh001:9000/actor/out/" }; int ec = ToolRunner.run(new Configuration(), new Star(), args0); System.exit(ec); } }
代码分析
Map
每次调用map(LongWritable key, Text value, Context context)解析一行数据。每行数据存储在value参数值中。然后根据'\t'分隔符,解析出明星姓名,性别和搜索指数
public static class ActorMapper extends Mapper< Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedExcept{
//value=name+gender+hotIndex
String[] tokens = value.toString().split("\t"); String gender = tokens[1].trim();//性别
String nameHotIndex = tokens[0] + "\t" + tokens[2];//名称和搜索指数 context.write(new Text(gender), new Text(nameHotIndex));
}
}
map()函数期望的输出结果Map = {key = gender, value = name+hotIndex}
Combiner
对 map 端的输出结果,先进行一次合并,减少数据的网络输出。
public static class ActorCombiner extends Reducer< Text, Text, Text, Text> {
private Text text = new Text();
@Override
public void reduce(Text key, Iterable< Text> values, Context context) throws IOException, InterruptedException{
int maxHotIndex = Integer.MIN_VALUE;
int hotIndex = 0;
String name="";
for (Text val : values) {
String[] valTokens = val.toString().split("\\t"); hotIndex = Integer.parseInt(valTokens[1]); if(hotIndex>maxHotIndex){
name = valTokens[0]; maxHotIndex = hotIndex;
}
}
text.set(name+"\t"+maxHotIndex); context.write(key, text);
}
}
Partitioner
根据明星性别对数据进行分区,将 Mapper 的输出结果均匀分布在 reduce 上。
/**
* Partitioner 根据sex选择分区
*/
public static class ActorPartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
String sex = key.toString();//按性别分区
// 默认指定分区 0
if(numReduceTasks==0)
return 0;
//性别为male 选择分区0
if(sex.equals("male"))
return 0;
//性别为female 选择分区1
if(sex.equals("female"))
return 1 % numReduceTasks;
//其他性别 选择分区2
else
return 2 % numReduceTasks;
}
}
Reduce
调用reduce(key, Iterable< Text> values, context)方法来处理每个key和values的集
合。我们在values集合中,计算出明星的最大搜索指数。
/**
*
* Reducer 统计男、女明星最高搜索指数
* @input key=gender value=name+hotIndex
* @output key=name value=gender+hotIndex(max)
*/
public static class ActorReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
int maxHotIndex = Integer.MIN_VALUE;
String name = " ";
int hotIndex = 0;
// 根据key,迭代 values 集合,求出最高搜索指数
for (Text val : values) {
String[] valTokens = val.toString().split("\\t");
hotIndex = Integer.parseInt(valTokens[1]);
if (hotIndex > maxHotIndex) {
name = valTokens[0];
maxHotIndex = hotIndex;
}
}
context.write(new Text(name), new Text( key + "\t"+ maxHotIndex));
}
}
reduce()函数期望的输出结果Reduce = {key = name, value = gender+max(hotIndex)}
Run 驱动方法
在 run 方法中,设置任务执行各种信息。
/**
* 任务驱动方法
* @param args
* @return
* @throws Exception
*/
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();//读取配置文件
Path mypath = new Path(args[1]);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
Job job = new Job(conf, "star");//新建一个任务
job.setJarByClass(Star.class);//主类
job.setNumReduceTasks(2);//reduce的个数设置为2
job.setPartitionerClass(ActorPartitioner.class);//设置Partitioner类
job.setMapperClass(ActorMapper.class);//Mapper
job.setMapOutputKeyClass(Text.class);//map 输出key类型
job.setMapOutputValueClass(Text.class);//map 输出value类型
job.setCombinerClass(ActorCombiner.class);//设置Combiner类
job.setReducerClass(ActorReducer.class);//Reducer
job.setOutputKeyClass(Text.class);//输出结果 key类型
job.setOutputValueClass(Text.class);//输出结果 value类型
FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
job.waitForCompletion(true);//提交任务
return 0;
}
编译和执行 MapReduce作业
1、IntelliJ IDEA 环境下, 将项目编译和打包为star.jar,使用SSH将 star.jar上传至hadoop的/home/hadoop/actor目录下。
2、使用cd /home/hadoop/actor切换到当前目录,通过命令行执行Hadoop作业
3 、hadoop jar star.jar com.mapreduce.Star