输入(数据摘自互联网):
data1:
data2:
程序源代码:
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CountAverage {
public static class AverageMapper extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException{
String inputline=value.toString();
StringTokenizer itr = new StringTokenizer(inputline);
String mapkey="";
String mapvalue="";
int count=0;
while (itr.hasMoreTokens()) {
if(count>0){
mapvalue=itr.nextToken();
continue;
}
mapkey=itr.nextToken();
count++;
}
context.write(new Text(mapkey),new Text(mapvalue));
}
}
public static class AverageCombiner extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Double sum=0.00;
int count=0;
for(Text t:values){
sum=sum+Double.parseDouble(t.toString());
count++;
}
context.write(new Text(key),new Text(sum+"-"+count));
}
}
public static class AverageReducer extends Reducer<Text,Text,Text,DoubleWritable> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Double sum=0.00;
int count=0;
for(Text t:values){
String[] str=t.toString().split("-");
sum+=Double.parseDouble(str[0]);
count+=Integer.parseInt(str[1]);
}
context.write(new Text(key),new DoubleWritable(sum/count));
}
}
/**
* @param args
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = new Job(conf, "count average");
job.setJarByClass(CountAverage.class);
job.setMapperClass(AverageMapper.class);
job.setCombinerClass(AverageCombiner.class);
job.setReducerClass(AverageReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/user/pu/input/*"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/pu/output/*"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
运行结果: