hadoop计算平均值

时间:2021-04-25 00:44:57

        combiner是运行在本地的,reduce是收集全部的,比如一个文件很大1G,比如一个文件很大1G,如果你的集群是5台双核的,如果你的集群是5台双核的,这样这16个块会被分到这10个块里面,相当于要2轮,假设是1、2分给1号机,3、4分给2号机,这样1、2求和完了之后会在1号机上运行一次combiner,3、4完了再2号机上运行一次combiner,所有的combiner运行完了,所有的数据会汇集到reduce上做最终处理。


输入(数据摘自互联网):

data1:

hadoop计算平均值

hadoop计算平均值

data2:

hadoop计算平均值

hadoop计算平均值

程序源代码:

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CountAverage {
public static class AverageMapper extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException{
String inputline=value.toString();
StringTokenizer itr = new StringTokenizer(inputline);
String mapkey="";
String mapvalue="";
int count=0;
while (itr.hasMoreTokens()) {
if(count>0){
mapvalue=itr.nextToken();
continue;
}
mapkey=itr.nextToken();
count++;
}
context.write(new Text(mapkey),new Text(mapvalue));
}
}

public static class AverageCombiner extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Double sum=0.00;
int count=0;
for(Text t:values){
sum=sum+Double.parseDouble(t.toString());
count++;
}
context.write(new Text(key),new Text(sum+"-"+count));
}
}

public static class AverageReducer extends Reducer<Text,Text,Text,DoubleWritable> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Double sum=0.00;
int count=0;
for(Text t:values){
String[] str=t.toString().split("-");
sum+=Double.parseDouble(str[0]);
count+=Integer.parseInt(str[1]);
}
context.write(new Text(key),new DoubleWritable(sum/count));
}
}

/**
* @param args
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = new Job(conf, "count average");
job.setJarByClass(CountAverage.class);
job.setMapperClass(AverageMapper.class);
job.setCombinerClass(AverageCombiner.class);
job.setReducerClass(AverageReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/user/pu/input/*"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/pu/output/*"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}


运行结果:

hadoop计算平均值

hadoop计算平均值hadoop计算平均值