MapReduce中combine、partition、shuffle的作用是什么

时间:2023-03-08 18:33:55
http://www.aboutyun.com/thread-8927-1-1.html





Mapreduce在hadoop中是一个比較难以的概念。以下须要用心看,然后自己就能总结出来了。

概括:

combine和partition都是函数。中间的步骤应该仅仅有shuffle!



1.combine

combine分为map端和reduce端,作用是把同一个key的键值对合并在一起,能够自己定义的。

combine函数把一个map函数产生的<key,value>对(多个key,value)合并成一个新的<key2,value2>.将新的<key2,value2>作为输入到reduce函数中

这个value2亦可称之为values,由于有多个。这个合并的目的是为了降低网络传输。



详细实现是由Combine类。

实现combine函数,该类的主要功能是合并同样的key键。通过job.setCombinerClass()方法设置。默觉得null,不合并中间结果。实现map函数

详细调用:(下图是调用reduce,合并map的个数)



难点:不知道这个reduce和mapreduce中的reduce差别是什么?

以下简单说一下:后面慢慢琢磨:

在mapreduce中。map多,reduce少。

在reduce中因为数据量比較多。所以干脆。我们先把自己map里面的数据归类,这样到了reduce的时候就减轻了压力。



这里举个样例:

map与reduce的样例

map理解为销售人员,reduce理解为销售经理。

每一个人(map)仅仅管销售,赚了多少钱销售人员不统计。也就是说这个销售人员没有Combine,那么这个销售经理就累垮了。由于每一个人都没有统计,它须要统计全部人员卖了多少件。赚钱了多少钱。

这样是不行的。所以销售经理(reduce)为了减轻压力,每一个人(map)都必须统计自己卖了多少钱,赚了多少钱(Combine),然后经理所做的事情就是统计每一个人统计之后的结果。这样经理就轻松多了。所以Combine在map所做的事情。减轻了reduce的事情。

(这就是为什么说map的Combine干reduce的事情。相信你应该明确了)

public  static void main(String[] args)throws IOException {

        Configuration conf = new Configuration();

        Job job = new Job(conf);

        job.setInputFormatClass(TextInputFormat.class);

        job.setMapperClass(Mapper.class);

        job.setCombinerClass(reduce.class);

        job.setPartitionerClass(HashPartitioner.class);

        job.setReducerClass(Reducer.class);

        job.setOutputFormatClass(TextOutFormat.class);

    }

}

2.partition

partition是切割map每一个节点的结果,依照key分别映射给不同的reduce。也是能够自己定义的。这里事实上能够理解归类。

我们对于错综复杂的数据归类。比方在动物园里有牛羊鸡鸭鹅。他们都是混在一起的。可是到了晚上他们就各自牛回牛棚。羊回羊圈,鸡回鸡窝。partition的作用就是把这些数据归类。仅仅只是在敲代码的时候,mapreduce使用哈希HashPartitioner帮我们归类了。这个我们也能够自己定义。

HashPartitioner是mapreduce的默认partitioner。

计算方法是



which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks。得到当前的目的reducer。

以下在看该怎样自己定义,该怎样调用:(以下便是自己定义了一个Partition函数。红字部分是算法的核心,也就是分区的核心)

public static class Partition extends Partitioner<IntWritable, IntWritable> {

                @Override

                public int getPartition(IntWritable key, IntWritable value,

                                int numPartitions) {

                        int Maxnumber = 65223;

                        int bound = Maxnumber / numPartitions + 1;

                        int keynumber = key.get();

                        for (int i = 0; i < numPartitions; i++) {

                                if (keynumber < bound * i && keynumber >= bound * (i - 1)) {

                                        return i - 1;

                                }

                        }

                        return 0;

                }



        }

那么我们该怎样调用:(以下调用之后,你的分区函数就生效了)

public static void main(String[] args) throws IOException,

InterruptedException, ClassNotFoundException {

Configuration conf = new Configuration();

Job job = new Job(conf, "sort");

job.setJarByClass(Sort.class);

job.setMapperClass(Map.class);

job.setReducerClass(Reduce.class);

job.setPartitionerClass(Partition.class);

job.setOutputKeyClass(IntWritable.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPaths(job, "/home/asheng/hadoop/in");

FileOutputFormat

.setOutputPath(job, new Path("/home/asheng/hadoop/out"));

job.waitForCompletion(true);

}

}

3.shuffle



shuffle就是map和reduce之间的过程。包括了两端的combine和partition。

它比較难以理解,由于我们摸不着。看不到它。它仅仅是理论存在的。并且确实存在,它属于mapreduce的框架。编程的时候。我们用不到它,它属于mapreduce框架。具体能够看通过实例让你真正明确mapreduce---填空式、分布(切割)编程

3.1shuffle的作用是

Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后。通过OutputFormat,进行输出

shuffle阶段的主要函数是fetchOutputs(),这个函数的功能就是将map阶段的输出,copy到reduce 节点本地。