1.Combiner功能是:合并汇总(shuffle中的组件)
1)combiner是MR程序中Mapper和Reducer之外的一种组件。
2)combiner组件的父类就是Reducer,也就是说combiner是继承之reducer的,相当于reducer
3)combiner和reducer的区别在于运行的位置:Combiner是在每一个maptask所在的节点运行,然后Reducer是接收全局所有Mapper的输出结果,也就是说
combiner函数的输出作为reducer的输入。
4)combiner的意义
就是对每一个maptask的输出进行局部汇总,以减小网络传输量,
属于优化方案。
因为combiner在mapreduce过程中可能调用也肯能不调用,可能调一次也可能调多次,无法确定和控制
所以,combiner使用的原则是:有或没有都不能影响业务逻辑,使不使用combiner都不能影响最终reducer的结果。而且, combiner的输出kv应该跟reducer的输入kv类型要对应起来。因为有时使用combiner不当的话会对统计结果造成错误的结局,还不如不用。比如对所有数求平均数:
Mapper端使用combiner
3 5 7 ->(3+5+7)/3=5
2 6 ->(2+6)/2=4
Reducer
(5+4)/2=9/2 不等于(3+5+7+2+6)/5=23/5
3.combiner既然可以汇总,能不能用combiner取代reduce函数?(面试题)
虽然combiner可以帮我们减少mapper和reducer之间的数据传输量,对mapper到reducer的数据进行局部汇总,减轻reducer的工作量和减少网络IO,但是我们仍然需要用reduce函数来处理不同map输出的具有相同键的记录。
4. 使用自定义Combiner实现步骤
(
1
)自定义一个
combiner
继承
Reducer
,重写
reduce
方法,以经典的wordcount为例
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable v :values){ //对同一个map输出的k,v对进行按k进行一次汇总。不同map的k,v汇总必须要用reduce方法
count += v.get();
}
context.write(key, new IntWritable(count));
}
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable v :values){ //对同一个map输出的k,v对进行按k进行一次汇总。不同map的k,v汇总必须要用reduce方法
count += v.get();
}
context.write(key, new IntWritable(count));
}
}
(2)在job中设置:
job.setCombinerClass(WordcountCombiner.class);
方案二:前提是combiner和reduce方法功能一样,可以不用定义combiner,直接使用reduce当combiner.
将WordcountReducer作为combiner在WordcountDriver驱动类中指定
job.setCombinerClass(WordcountReducer.class);