MapReduce的分析模型中,还是有些东西值得讨论和研究的:
本文讨论一些MapReduce执行过程中的细节问题,可与本人另外一篇博客相互对照学习:
接下来,进入正文(本文讨论依旧是基于Hadoop-1.0.0):
-----------------------------------------------------------
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }这代码司空见惯了,但是, 其中有一个问题,这个迭代器,是从哪里来的?
实际上,在执行我们提交的reduce方法之前,会先执行其父类,即Reducer中的run方法:
reducer.run(reducerContext);
这行代码在ReduceTask中,我们看看这个run方法:
public void run(Context context) throws IOException, InterruptedException { setup(context); // 判断是否有下一个Key和Value while (context.nextKey()) { // 处理下一个独立的key和value reduce(context.getCurrentKey(), context.getValues(), context); } cleanup(context); }
可以看到很多博客,都提到说Reducer或者Mapper中本来有一些setup的方法,可以设置一些全局变量,说的就是这Mapper或者Reducer中的setup方法,以及cleanup方法,这些方法在MapTask或者ReduceTask中只会执行一次:
而实际上,这里传入的context来源于:
org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext( reducer,// 这就是前面拿到的Reduce的Class job,// job提交的相关配置 getTaskID(), // 提交的Application此次的TaskID rIter, // 实质上就是收集到的所有数据的迭代器 reduceInputKeyCounter, reduceInputValueCounter, trackedRW, committer, reporter, comparator,// 排序的比较器 keyClass, valueClass);其中,我们需要最大关注的,其实就是这个rIter,其实质就是所有数据的一个迭代器,说起来高大上,其本质就是RawKeyValueIterator:
/** * <code>RawKeyValueIterator</code> is an iterator used to iterate over * the raw keys and values during sort/merge of intermediate data. */ public interface RawKeyValueIterator
上面的方法中,实际是用的反射方法构建了一个ReduceContext,接下来,我们重点看下reduce方法的执行过程和传入参数:
reduce(context.getCurrentKey(), context.getValues(), context);
在细致讲述该方法之前,说一下ReduceContext中的一个变量:
private ValueIterable iterable = new ValueIterable();
其实质,是此次元素的迭代器:
// 这就是返回的哪个迭代器 protected class ValueIterable implements Iterable<VALUEIN> { private ValueIterator iterator = new ValueIterator(); @Override public Iterator<VALUEIN> iterator() { return iterator; } }
而实质上,对于该元素的操作,都是基于其内部的iterator来实现的,先记住这一点,我们继续看看参数的传入:
/** * Iterate through the values for the current key, reusing the same value * object, which is stored in the context. * * @return the series of values associated with the current key. All of the * objects returned directly and indirectly from this method are * reused. */ public Iterable<VALUEIN> getValues() throws IOException, InterruptedException { return iterable; }
这个方法很重要,看注释就知道,其会遍历对应于一个Key所有的value,并且生成一个迭代器,这是什么意思?就是我们眼见的reduce方法中的迭代器,并不是文件中是迭代器,本质上来说,文件中依旧是一个个key,value的成对数据,我们在真正执行reduceTask的时候,才把这些排好顺序的key,value数据拿出来,一行行读取,在这个阶段,转化成迭代器,交给了我们的reduce过程来执行:
其实本质上也是,我们可以想象,如果map端传过来的是类似于key,Iterator<value>这种的数据,对于reduce这边,进行操作其实并不是很便利的,所以很多博客中写的只是简写,我们必须清楚实质上传过来的数据,依旧是key,value的数据,而非key,iterator<value>的数据:
我们看到的reduce方法中的迭代器,是由run方法执行产生的,其产生了我们看到的迭代器:
我们接着看,从nextKey方法为切入点:
/** Start processing next unique key. */ public boolean nextKey() throws IOException, InterruptedException { // 默认情况下,hasMore是true,nextKeyisSam是false // 直接false进行下一步 while (hasMore && nextKeyIsSame) { nextKeyValue(); } // 默认情况下,或者是第一个key的时候,到了这里 if (hasMore) { if (inputKeyCounter != null) { inputKeyCounter.increment(1); } // 返回这个 return nextKeyValue(); } else { return false; } }
从最先开始的时候,我们的hasMore和nextKeyIsSame都是false,直接会执行nextKeyValue方法,要注意,这里有个while循环,直到下一个key与当前key不一致,才会截止:
/** * Advance to the next key/value pair. */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!hasMore) { key = null; value = null; return false; } // firstValue为true firstValue = !nextKeyIsSame; // 当前的key DataInputBuffer next = input.getKey(); currentRawKey.set(next.getData(), next.getPosition(), next.getLength() - next.getPosition()); buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength()); key = keyDeserializer.deserialize(key); next = input.getValue(); buffer.reset(next.getData(), next.getPosition(), next.getLength()); value = valueDeserializer.deserialize(value); // 如果有接下来的value hasMore = input.next(); if (hasMore) { next = input.getKey(); // 判断nextKey是否跟这个一样 nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, currentRawKey.getLength(), next.getData(), next.getPosition(), next.getLength() - next.getPosition()) == 0; } else { nextKeyIsSame = false; } inputValueCounter.increment(1); return true; }
对于本方法不进行仔细的分析了,大家知道迭代器的来源即可: