combiner函数的使用注意事项和代码演示

时间:2022-10-07 18:54:43
1.Combiner功能是:合并汇总(shuffle中的组件)
1)combiner是MR程序中Mapper和Reducer之外的一种组件。
2)combiner组件的父类就是Reducer,也就是说combiner是继承之reducer的,相当于reducer
3)combiner和reducer的区别在于运行的位置:Combiner是在每一个maptask所在的节点运行,然后Reducer是接收全局所有Mapper的输出结果,也就是说 combiner函数的输出作为reducer的输入
4)combiner的意义 就是对每一个maptask的输出进行局部汇总,以减小网络传输量, 属于优化方案。
2.注意Combiner的使用要非常谨慎:
      因为combiner在mapreduce过程中可能调用也肯能不调用,可能调一次也可能调多次,无法确定和控制
      所以,combiner使用的原则是:有或没有都不能影响业务逻辑,使不使用combiner都不能影响最终reducer的结果。而且, combiner的输出kv应该跟reducer的输入kv类型要对应起来。因为有时使用combiner不当的话会对统计结果造成错误的结局,还不如不用。比如对所有数求平均数:
    Mapper端使用combiner
         3 5 7 ->(3+5+7)/3=5
         2 6 ->(2+6)/2=4
    Reducer
          (5+4)/2=9/2  不等于(3+5+7+2+6)/5=23/5
3.combiner既然可以汇总,能不能用combiner取代reduce函数?(面试题)

    虽然combiner可以帮我们减少mapper和reducer之间的数据传输量,对mapper到reducer的数据进行局部汇总,减轻reducer的工作量和减少网络IO,但是我们仍然需要用reduce函数来处理不同map输出的具有相同键的记录。

4. 使用自定义Combiner实现步骤

1 )自定义一个 combiner 继承 Reducer ,重写 reduce 方法,以经典的wordcount为例

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
       @Override
       protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
              int count = 0;
              for(IntWritable v :values){
//对同一个map输出的k,v对进行按k进行一次汇总。不同map的k,v汇总必须要用reduce方法
                     count += v.get();
              }
              context.write(key, new IntWritable(count));
       }
}
(2)在job中设置:  job.setCombinerClass(WordcountCombiner.class);

方案二:前提是combiner和reduce方法功能一样,可以不用定义combiner,直接使用reduce当combiner.
        将WordcountReducer作为combiner在WordcountDriver驱动类中指定
         job.setCombinerClass(WordcountReducer.class);