MapReduce编程实现明星搜索指数统计和找出人气王

时间:2021-11-23 18:22:14

项目介绍

本项目我们使用明星搜索指数数据,分别统计出搜索指数最高的男明星和女明星。

思路分析

基于项目的需求,我们通过以下几步完成:

1、编写 Mapper类,按需求将数据集解析为 key=gendervalue=name+hotIndex,然后

输出。

2、编写 Combiner 类,合并 Mapper 输出结果,然后输出给 Reducer

3、编写 Partitioner 类,按性别,将结果指定给不同的 Reduce 执行。

4、编写 Reducer 类,分别统计出男、女明星的最高搜索指数。

5、编写 run 方法执行 MapReduce 任务。

数据格式

MapReduce编程实现明星搜索指数统计和找出人气王

代码

MapReduce编程实现明星搜索指数统计和找出人气王

MapReduce编程实现明星搜索指数统计和找出人气王

MapReduce编程实现明星搜索指数统计和找出人气王

MapReduce编程实现明星搜索指数统计和找出人气王

MapReduce编程实现明星搜索指数统计和找出人气王

package com.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

/**

 *

 * 统计分别统计出男女明星最大搜索指数

 */

public class Star extends Configured implements Tool {

/**

 *  Mapper 解析明星数据

 * @input key=偏移量  value=明星数据

 * @output key=gender value=name+hotIndex

 */

public static class ActorMapper extends Mapper<Object, Text, Text, Text> {

 

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

//value=name+gender+hotIndex

String[] tokens = value.toString().split("\t");//使用分隔符\t,将数据解析为数组 tokens

String gender = tokens[1].trim();//性别

String nameHotIndex = tokens[0] + "\t" + tokens[2];//名称和关注指数

//输出key=gender value=name+hotIndex

context.write(new Text(gender), new Text(nameHotIndex));

}

}

 

/**

 *

 *  Partitioner 根据sex选择分区

 *

 */

public static class ActorPartitioner extends Partitioner<Text, Text> {	 

        @Override

        public int getPartition(Text key, Text value, int numReduceTasks) {

        

            String sex = key.toString();//按性别分区

            

            // 默认指定分区 0

            if(numReduceTasks==0)

            	return 0;

            

            //性别为male 选择分区0

            if(sex.equals("male"))             

                return 0;

            //性别为female 选择分区1

            if(sex.equals("female"))

            	return 1 % numReduceTasks;

            //其他性别 选择分区2

            else

                return 2 % numReduceTasks;

           

        }

    }

 

/**

 *

 *  定义Combiner 合并 Mapper 输出结果

 *

 */

public static class ActorCombiner extends Reducer<Text, Text, Text, Text> {

private Text text = new Text();

@Override

public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {

int maxHotIndex = Integer.MIN_VALUE;

int hotIndex = 0;

String name="";

for (Text val : values) {

String[] valTokens = val.toString().split("\\t");

hotIndex = Integer.parseInt(valTokens[1]);

if(hotIndex>maxHotIndex){

name = valTokens[0];

maxHotIndex = hotIndex;

}

}

text.set(name+"\t"+maxHotIndex);

context.write(key, text);

}

}

/**

 *

 *  Reducer 统计男、女明星最高搜索指数

 * @input key=gender  value=name+hotIndex

 * @output key=name value=gender+hotIndex(max)

 */

public static class ActorReducer extends Reducer<Text, Text, Text, Text> {

 

@Override

public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {

 

int maxHotIndex = Integer.MIN_VALUE;

 

String name = " ";

int hotIndex = 0;

// 根据key,迭代 values 集合,求出最高搜索指数

for (Text val : values) {

String[] valTokens = val.toString().split("\\t");

hotIndex = Integer.parseInt(valTokens[1]);

if (hotIndex > maxHotIndex) {

name = valTokens[0];

maxHotIndex = hotIndex;

}

}

context.write(new Text(name), new Text( key + "\t"+ maxHotIndex));

}

}

 

/**

 *  任务驱动方法

 * @param args

 * @return

 * @throws Exception

 */

@Override

public int run(String[] args) throws Exception {

// TODO Auto-generated method stub

Configuration conf = new Configuration();//读取配置文件

Path mypath = new Path(args[1]);

FileSystem hdfs = mypath.getFileSystem(conf);

if (hdfs.isDirectory(mypath)) {

hdfs.delete(mypath, true);

}

 

Job job = new Job(conf, "star");//新建一个任务

job.setJarByClass(Star.class);//主类

job.setNumReduceTasks(2);//reduce的个数设置为2

job.setPartitionerClass(ActorPartitioner.class);//设置Partitioner类

job.setMapperClass(ActorMapper.class);//Mapper

job.setMapOutputKeyClass(Text.class);//map 输出key类型

job.setMapOutputValueClass(Text.class);//map 输出value类型

job.setCombinerClass(ActorCombiner.class);//设置Combiner类

job.setReducerClass(ActorReducer.class);//Reducer

job.setOutputKeyClass(Text.class);//输出结果 key类型

job.setOutputValueClass(Text.class);//输出结果 value类型

FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径

FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径

job.waitForCompletion(true);//提交任务

return 0;

}

/**

 *  main 方法

 * @param args

 * @throws Exception

 */

public static void main(String[] args) throws Exception {

String[] args0 = { "hdfs://cdh001:9000/actor/actor.txt",

"hdfs://cdh001:9000/actor/out/" };

int ec = ToolRunner.run(new Configuration(), new Star(), args0);

System.exit(ec);

}

}

代码分析

Map

 

每次调用map(LongWritable key, Text value, Context context)解析一行数据。每行数据存储在value参数值中。然后根据'\t'分隔符,解析出明星姓名,性别和搜索指数

 

public static class ActorMapper extends Mapper< Object, Text, Text, Text> {

public void map(Object key, Text value, Context context) throws IOException, InterruptedExcept{

 //value=name+gender+hotIndex

String[] tokens = value.toString().split("\t"); String gender = tokens[1].trim();//性别

 

String nameHotIndex = tokens[0] + "\t" + tokens[2];//名称和搜索指数 context.write(new Text(gender), new Text(nameHotIndex));

}

}

map()函数期望的输出结果Map = {key = gender, value = name+hotIndex}

 

Combiner

 

map 端的输出结果,先进行一次合并,减少数据的网络输出。

public static class ActorCombiner extends Reducer< Text, Text, Text, Text> {

private Text text = new Text();

@Override

public void reduce(Text key, Iterable< Text> values, Context context) throws IOException, InterruptedException{

int maxHotIndex = Integer.MIN_VALUE;

int hotIndex = 0;

String name="";

for (Text val : values) {

String[] valTokens = val.toString().split("\\t"); hotIndex = Integer.parseInt(valTokens[1]); if(hotIndex>maxHotIndex){

 

name = valTokens[0]; maxHotIndex = hotIndex;

}

}

text.set(name+"\t"+maxHotIndex); context.write(key, text);

}

}

Partitioner

 

根据明星性别对数据进行分区,将 Mapper 的输出结果均匀分布在 reduce 上。

/**

 *  Partitioner 根据sex选择分区

 */

public static class ActorPartitioner extends Partitioner<Text, Text> {  

        @Override

        public int getPartition(Text key, Text value, int numReduceTasks) {

        

            String sex = key.toString();//按性别分区

            

            // 默认指定分区 0

            if(numReduceTasks==0)

             return 0;

            

            //性别为male 选择分区0

            if(sex.equals("male"))             

                return 0;

            //性别为female 选择分区1

            if(sex.equals("female"))

             return 1 % numReduceTasks;

            //其他性别 选择分区2

            else

                return 2 % numReduceTasks;

           

        }

}

Reduce

 

调用reduce(key, Iterable< Text> values, context)方法来处理每个keyvalues的集

 

合。我们在values集合中,计算出明星的最大搜索指数。

/**

 *

 *  Reducer 统计男、女明星最高搜索指数

 * @input key=gender  value=name+hotIndex

 * @output key=name value=gender+hotIndex(max)

 */

public static class ActorReducer extends Reducer<Text, Text, Text, Text> {

 

@Override

public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {

 

int maxHotIndex = Integer.MIN_VALUE;

 

String name = " ";

int hotIndex = 0;

// 根据key,迭代 values 集合,求出最高搜索指数

for (Text val : values) {

String[] valTokens = val.toString().split("\\t");

hotIndex = Integer.parseInt(valTokens[1]);

if (hotIndex > maxHotIndex) {

name = valTokens[0];

maxHotIndex = hotIndex;

}

}

context.write(new Text(name), new Text( key + "\t"+ maxHotIndex));

}

}

reduce()函数期望的输出结果Reduce = {key = name, value = gender+max(hotIndex)}

 

Run 驱动方法

 

run 方法中,设置任务执行各种信息。

/**

 *  任务驱动方法

 * @param args

 * @return

 * @throws Exception

 */

@Override

public int run(String[] args) throws Exception {

// TODO Auto-generated method stub

Configuration conf = new Configuration();//读取配置文件

Path mypath = new Path(args[1]);

FileSystem hdfs = mypath.getFileSystem(conf);

if (hdfs.isDirectory(mypath)) {

hdfs.delete(mypath, true);

}

 

Job job = new Job(conf, "star");//新建一个任务

job.setJarByClass(Star.class);//主类

job.setNumReduceTasks(2);//reduce的个数设置为2

job.setPartitionerClass(ActorPartitioner.class);//设置Partitioner

job.setMapperClass(ActorMapper.class);//Mapper

job.setMapOutputKeyClass(Text.class);//map 输出key类型

job.setMapOutputValueClass(Text.class);//map 输出value类型

job.setCombinerClass(ActorCombiner.class);//设置Combiner

job.setReducerClass(ActorReducer.class);//Reducer

job.setOutputKeyClass(Text.class);//输出结果 key类型

job.setOutputValueClass(Text.class);//输出结果 value类型

FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径

FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径

job.waitForCompletion(true);//提交任务

return 0;

}

编译和执行 MapReduce作业

 

1IntelliJ IDEA 环境下,  将项目编译和打包为star.jar,使用SSHstar.jar上传至hadoop/home/hadoop/actor目录下。

 

2、使用cd /home/hadoop/actor切换到当前目录,通过命令行执行Hadoop作业

3 hadoop jar star.jar com.mapreduce.Star