MapReduce模型中的一些细节讨论

时间:2023-02-13 07:27:17

MapReduce的分析模型中,还是有些东西值得讨论和研究的:

本文讨论一些MapReduce执行过程中的细节问题,可与本人另外一篇博客相互对照学习:

接下来,进入正文(本文讨论依旧是基于Hadoop-1.0.0):

-----------------------------------------------------------

public static class IntSumReducer extends
			Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();
		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}
这代码司空见惯了,但是, 其中有一个问题,这个迭代器,是从哪里来的?

实际上,在执行我们提交的reduce方法之前,会先执行其父类,即Reducer中的run方法:

reducer.run(reducerContext);

这行代码在ReduceTask中,我们看看这个run方法:

public void run(Context context) throws IOException, InterruptedException {
		setup(context);
		// 判断是否有下一个Key和Value
		while (context.nextKey()) {
			// 处理下一个独立的key和value
			reduce(context.getCurrentKey(), context.getValues(), context);
		}
		cleanup(context);
	}

可以看到很多博客,都提到说Reducer或者Mapper中本来有一些setup的方法,可以设置一些全局变量,说的就是这Mapper或者Reducer中的setup方法,以及cleanup方法,这些方法在MapTask或者ReduceTask中只会执行一次:

而实际上,这里传入的context来源于:

org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(
				reducer,// 这就是前面拿到的Reduce的Class
				job,// job提交的相关配置
				getTaskID(), // 提交的Application此次的TaskID
				rIter, // 实质上就是收集到的所有数据的迭代器
				reduceInputKeyCounter, reduceInputValueCounter, trackedRW,
				committer, reporter, comparator,// 排序的比较器
				keyClass, valueClass);
其中,我们需要最大关注的,其实就是这个rIter,其实质就是所有数据的一个迭代器,说起来高大上,其本质就是RawKeyValueIterator:
/**
 * <code>RawKeyValueIterator</code> is an iterator used to iterate over
 * the raw keys and values during sort/merge of intermediate data. 
 */
public interface RawKeyValueIterator 

上面的方法中,实际是用的反射方法构建了一个ReduceContext,接下来,我们重点看下reduce方法的执行过程和传入参数:

reduce(context.getCurrentKey(), context.getValues(), context);

在细致讲述该方法之前,说一下ReduceContext中的一个变量:

	private ValueIterable iterable = new ValueIterable();

其实质,是此次元素的迭代器:

// 这就是返回的哪个迭代器
	protected class ValueIterable implements Iterable<VALUEIN> {
		private ValueIterator iterator = new ValueIterator();

		@Override
		public Iterator<VALUEIN> iterator() {
			return iterator;
		}
	}

而实质上,对于该元素的操作,都是基于其内部的iterator来实现的,先记住这一点,我们继续看看参数的传入:

/**
	 * Iterate through the values for the current key, reusing the same value
	 * object, which is stored in the context.
	 * 
	 * @return the series of values associated with the current key. All of the
	 *         objects returned directly and indirectly from this method are
	 *         reused.
	 */
	public Iterable<VALUEIN> getValues() throws IOException,
			InterruptedException {
		return iterable;
	}

这个方法很重要,看注释就知道,其会遍历对应于一个Key所有的value,并且生成一个迭代器,这是什么意思?就是我们眼见的reduce方法中的迭代器,并不是文件中是迭代器,本质上来说,文件中依旧是一个个key,value的成对数据,我们在真正执行reduceTask的时候,才把这些排好顺序的key,value数据拿出来,一行行读取,在这个阶段,转化成迭代器,交给了我们的reduce过程来执行:

其实本质上也是,我们可以想象,如果map端传过来的是类似于key,Iterator<value>这种的数据,对于reduce这边,进行操作其实并不是很便利的,所以很多博客中写的只是简写,我们必须清楚实质上传过来的数据,依旧是key,value的数据,而非key,iterator<value>的数据:

我们看到的reduce方法中的迭代器,是由run方法执行产生的,其产生了我们看到的迭代器:

我们接着看,从nextKey方法为切入点:

/** Start processing next unique key. */
	public boolean nextKey() throws IOException, InterruptedException {
		// 默认情况下,hasMore是true,nextKeyisSam是false
		// 直接false进行下一步
		while (hasMore && nextKeyIsSame) {
			nextKeyValue();
		}

		// 默认情况下,或者是第一个key的时候,到了这里
		if (hasMore) {
			if (inputKeyCounter != null) {
				inputKeyCounter.increment(1);
			}
			// 返回这个
			return nextKeyValue();
		} else {
			return false;
		}
	}

从最先开始的时候,我们的hasMore和nextKeyIsSame都是false,直接会执行nextKeyValue方法,要注意,这里有个while循环,直到下一个key与当前key不一致,才会截止:

/**
	 * Advance to the next key/value pair.
	 */
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if (!hasMore) {
			key = null;
			value = null;
			return false;
		}

		// firstValue为true
		firstValue = !nextKeyIsSame;

		// 当前的key
		DataInputBuffer next = input.getKey();
		currentRawKey.set(next.getData(), next.getPosition(), next.getLength()
				- next.getPosition());
		buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
		key = keyDeserializer.deserialize(key);
		next = input.getValue();
		buffer.reset(next.getData(), next.getPosition(), next.getLength());
		value = valueDeserializer.deserialize(value);
		// 如果有接下来的value
		hasMore = input.next();
		if (hasMore) {
			next = input.getKey();
			// 判断nextKey是否跟这个一样
			nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
					currentRawKey.getLength(), next.getData(),
					next.getPosition(), next.getLength() - next.getPosition()) == 0;
		} else {
			nextKeyIsSame = false;
		}
		inputValueCounter.increment(1);
		return true;
	}

对于本方法不进行仔细的分析了,大家知道迭代器的来源即可: