MapReduce架构和算法(2)

时间:2024-09-06 13:05:08

一个、combiner计划

每map它可能会产生大量的输出,combiner的作用是map输出端先做合并。reducer的数据量。

combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能

假设不用combiner,那么,全部的结果都是reduce完毕。效率会相对低下。

使用combiner,先完毕的map会在本地聚合,提升速度。

注意:Combiner的输出是Reducer的输入,Combiner绝不能改变终于的计算结果。

所以从我的想法来看,Combiner仅仅应该用于那种Reduce的输入key/value与输出

key/value类型全然一致,且不影响终于结果的场景。比方累加,最大值等。

Combiner仅在Map端进行数据归约。 Map之间的数据是无法归约的,因此必须使用Reducer

   Combiner的适合场景:求和,最大值,最小值等

   Combiner的不适合场景:求平均数

   举例

   假如有1T的数据。对里面的数据求和,这一个T的数据被分成非常多Block,再Map端进行读取之后所有送入Reducer端,这种话Reducer处理的数据>=1T

   可是假设再map端进行Combiner合并之后再传到Reducer之后,那么Reducer端处理的数据就非常少了。这样就体现了分布式的优势。(相反不用Combiner就根部体现不了分    布式的优势)

二、Partitioner编程

Partitioner是partitioner的基类,假设须要定制partitioner也须要继承该类。



HashPartitioner是mapreduce的默认partitioner。计算方法是

which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks。得到当前的目的reducer。

(样例以jar形式执行)



来看下默认的HashPartitioner

public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {



  public void configure(JobConf job) {}



  /** Use {@link Object#hashCode()} to partition. */

  public int getPartition(K2 key, V2 value,

                          int numReduceTasks) {

    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

  }



}

注意:这里的getPartition默认的返回值是0,返回值是分区的编号

假设我们没有自己定义分区的话。默认就仅仅有一个分区

适合场景:城市的分区。IP地址的分区,电话号码的分区等等 

分组跟排序

分组要实现RaoComparator接口 

在map和reduce阶段进行排序时,比較的是k2。

v2是不參与排序比較的。假设要想让v2也进行排序,须要把k2和v2组装成新的类,作为k2,才干參与比較。



分组时也是依照k2进行比較的。

Shuffle

MapReduce架构和算法(2)

1 每一个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性)。一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定文件夹(mapred.local.dir)下的新建的一个溢出写文件。

2 写磁盘前。要partition,sort。假设有combiner,combine排序后数据。

3 等最后记录写完。合并所有溢出写文件为一个分区且排序的文件。

2.1 Reducer通过Http方式得到输出文件的分区。

2.2 TaskTracker为分区文件执行Reduce任务。复制阶段把Map输出拷贝到Reducer的内存或磁盘。一个Map任务完毕,Reduce就開始复制输出。

2.3 排序阶段合并map输出。然后走Reduce阶段。

仅仅看这个图,不看other maps。other reducers。有4个map任务。3个reducer

Reducer源代码有Shuffle定义

版权声明:本文博客原创文章,博客,未经同意,不得转载。