【Hadoop代码笔记】Hadoop作业提交之Child启动reduce任务

时间:2021-11-01 22:37:06

一、概要描述

上篇博文描述了TaskTracker启动一个独立的java进程来执行Map任务。接上上篇文章,TaskRunner线程执行中,会构造一个java –D** Child address port tasked这样第一个java命令,单独启动一个java进程。在Child的main函数中通过TaskUmbilicalProtocol协议,从TaskTracker获得需要执行的Task,并调用Task的run方法来执行。在ReduceTask而Task的run方法会通过java反射机制构造Reducer,Reducer.Context,然后调用构造的Reducer的run方法执行reduce操作。不同于map任务,在执行reduce任务前,需要把map的输出从map运行的tasktracker上拷贝到reducer运行的tasktracker上。

Reduce需要集群上若干个map任务的输出作为其特殊的分区文件。每个map任务完成的时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。其实是启动若干个MapOutputCopier线程来复制完所有map输出。在复制完成后reduce任务进入排序阶段。这个阶段将由LocalFSMerger或InMemFSMergeThread合并map输出,维持其顺序排序。【即对有序的几个文件进行归并,采用归并排序】在reduce阶段,对已排序输出的每个键都要调用reduce函数,此阶段的输出直接写到文件系统,一般为HDFS上。(如果采用HDFS,由于tasktracker节点也是DataNoe,所以第一个块副本将被写到本地磁盘。 即数据本地化)

Map 任务完成后,会通知其父tasktracker状态更新,然后tasktracker通知jobtracker。通过心跳机制来完成。因此jobtracker知道map输出和tasktracker之间的映射关系。Reducer的一个getMapCompletionEvents线程定期询问jobtracker以便获取map输出位置。

二、 流程描述  

1.在ReduceTak中 构建ReduceCopier对象,调用其fetchOutputs方法。

2. 在ReduceCopier的fetchOutputs方法中分别构造几个独立的线程。相互配合,并分别独立的完成任务。

2.1 GetMapEventsThread线程通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,即MapTask输出的地址,构造URL,加入到mapLocations,供copier线程获取。

2.2构造并启动若干个MapOutputCopier线程,通过http协议,把map的输出从远端服务器拷贝的本地,如果可以放在内存中,则存储在内存中调用,否则保存在本地文件。

2.3LocalFSMerger对磁盘上的map 输出进行归并。

2.4nMemFSMergeThread对内存中的map输出进行归并。

3.根据拷贝到的map输出构造一个raw keyvalue的迭代器,作为reduce的输入。

4. 调用runNewReducer方法中根据配置的Reducer类构造一个Reducer实例和运行的上下文。并调用reducer的run方法来执行到用户定义的reduce操作。。

5.在Reducer的run方法中从上下文中取出一个key和该key对应的Value集合(Iterable<VALUEIN>类型),调用reducer的reduce方法进行处理。

6. Recuer的reduce方法是用户定义的处理数据的方法,也是用户唯一需要定义的方法。

【Hadoop代码笔记】Hadoop作业提交之Child启动reduce任务

三、代码详细

1. Child的main方法每个task进程都会被在单独的进程中执行,这个方法就是这些进程的入口方法。Reduce和map一样都是由该main函数调用。所以此处不做描述,详细见上节Child启动map任务

2. ReduceTask的run方法。在Child子进程中被调用,执行用户定义的Reduce操作。前面代码逻辑和MapTask类似。通过TaskUmbilicalProtocol向tasktracker上报执行进度。开启线程向TaskTracker上报进度,根据task的不同动作要求执行不同的方法,如jobClean,jobsetup,taskCleanup。对于部分的了解可以产看taskTracker获取Task文章中的 JobTracker的 heartbeat方法处的详细解释。不同于map任务,在执行reduce任务前,需要把map的输出从map运行的tasktracker上拷贝到reducer运行的tasktracker上。

@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean("mapred.skip.on", isSkipping()); if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
reporter.startCommunicationThread();
boolean useNewApi = job.getUseNewReducer();
initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
} // Initialize the codec
codec = initCodec(); boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local")); //如果不是一个本地执行额模式(就是配置中不是分布式的),则要启动一个ReduceCopier来拷贝Map的输出,即Reduce的输入。
if (!isLocal) {
reduceCopier = new ReduceCopier(umbilical, job, reporter);
if (!reduceCopier.fetchOutputs()) {
if(reduceCopier.mergeThrowable instanceof FSError) {
LOG.error("Task: " + getTaskID() + " - FSError: " +
StringUtils.stringifyException(reduceCopier.mergeThrowable));
umbilical.fsError(getTaskID(),
reduceCopier.mergeThrowable.getMessage());
}
throw new IOException("Task: " + getTaskID() +
" - The reduce copier failed", reduceCopier.mergeThrowable);
}
}
copyPhase.complete();
//拷贝完成后,进入sort阶段。
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical); final FileSystem rfs = FileSystem.getLocal(job).getRaw();
RawKeyValueIterator rIter = isLocal
? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()), job.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null)
: reduceCopier.createKVIterator(job, rfs, reporter); // free up the data structures
mapOutputFilesOnDisk.clear(); sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator(); if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
done(umbilical, reporter);
}

3. ReduceCopier类的fetchOutputs方法。该方法负责将map的输出拷贝的reduce端进程处理。从代码上看,启动了一个LocalFSMerger、InMemFSMergeThread、  GetMapEventsThread 和若干个MapOutputCopier线程。几个独立的线程。相互配合,并分别独立的完成任务。

public boolean fetchOutputs() throws IOException {
int totalFailures = 0;
int numInFlight = 0, numCopied = 0;
DecimalFormat mbpsFormat = new DecimalFormat("0.00");
final Progress copyPhase =
reduceTask.getProgress().phase();
LocalFSMerger localFSMergerThread = null;
InMemFSMergeThread inMemFSMergeThread = null;
GetMapEventsThread getMapEventsThread = null; for (int i = 0; i < numMaps; i++) {
copyPhase.addPhase(); // add sub-phase per file
} //1)根据配置的numCopiers数量构造若干个MapOutputCopier拷贝线程,默认是5个,正是这些MapOutputCopier来实施的拷贝任务。
copiers = new ArrayList<MapOutputCopier>(numCopiers); // start all the copying threads
for (int i=0; i < numCopiers; i++) {
MapOutputCopier copier = new MapOutputCopier(conf, reporter);
copiers.add(copier); copier.start();
} //start the on-disk-merge thread 2)启动磁盘merge线程(参照后面方法)
localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
//start the in memory merger thread 3)启动内存merge线程(参照后面方法)
inMemFSMergeThread = new InMemFSMergeThread();
localFSMergerThread.start();
inMemFSMergeThread.start(); // start the map events thread 4)启动merge事件获取线程
getMapEventsThread = new GetMapEventsThread();
getMapEventsThread.start(); // start the clock for bandwidth measurement
long startTime = System.currentTimeMillis();
long currentTime = startTime;
long lastProgressTime = startTime;
long lastOutputTime = 0; // loop until we get all required outputs
//5)当获取到的copiedMapOutputs数量小于map数时,说明还没有拷贝完成,则一直执行。在执行中会根据时间进度一直打印输出,表示已经拷贝了多少个map的输出,还有多万未完成。
while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) { currentTime = System.currentTimeMillis();
boolean logNow = false;
if (currentTime - lastOutputTime > MIN_LOG_TIME) {
lastOutputTime = currentTime;
logNow = true;
}
if (logNow) {
LOG.info(reduceTask.getTaskID() + " Need another "
+ (numMaps - copiedMapOutputs.size()) + " map output(s) "
+ "where " + numInFlight + " is already in progress");
} // Put the hash entries for the failed fetches.
Iterator<MapOutputLocation> locItr = retryFetches.iterator(); while (locItr.hasNext()) {
MapOutputLocation loc = locItr.next();
List<MapOutputLocation> locList =
mapLocations.get(loc.getHost()); // Check if the list exists. Map output location mapping is cleared
// once the jobtracker restarts and is rebuilt from scratch.
// Note that map-output-location mapping will be recreated and hence
// we continue with the hope that we might find some locations
// from the rebuild map.
if (locList != null) {
// Add to the beginning of the list so that this map is
//tried again before the others and we can hasten the
//re-execution of this map should there be a problem
locList.add(0, loc);
}
} if (retryFetches.size() > 0) {
LOG.info(reduceTask.getTaskID() + ": " +
"Got " + retryFetches.size() +
" map-outputs from previous failures");
}
// clear the "failed" fetches hashmap
retryFetches.clear(); // now walk through the cache and schedule what we can
int numScheduled = 0;
int numDups = 0; synchronized (scheduledCopies) { // Randomize the map output locations to prevent
// all reduce-tasks swamping the same tasktracker
List<String> hostList = new ArrayList<String>();
hostList.addAll(mapLocations.keySet()); Collections.shuffle(hostList, this.random); Iterator<String> hostsItr = hostList.iterator(); while (hostsItr.hasNext()) { String host = hostsItr.next(); List<MapOutputLocation> knownOutputsByLoc =
mapLocations.get(host); // Check if the list exists. Map output location mapping is
// cleared once the jobtracker restarts and is rebuilt from
// scratch.
// Note that map-output-location mapping will be recreated and
// hence we continue with the hope that we might find some
// locations from the rebuild map and add then for fetching.
if (knownOutputsByLoc == null || knownOutputsByLoc.size() == 0) {
continue;
} //Identify duplicate hosts here
if (uniqueHosts.contains(host)) {
numDups += knownOutputsByLoc.size();
continue;
} Long penaltyEnd = penaltyBox.get(host);
boolean penalized = false; if (penaltyEnd != null) {
if (currentTime < penaltyEnd.longValue()) {
penalized = true;
} else {
penaltyBox.remove(host);
}
} if (penalized)
continue; synchronized (knownOutputsByLoc) { locItr = knownOutputsByLoc.iterator(); while (locItr.hasNext()) { MapOutputLocation loc = locItr.next(); // Do not schedule fetches from OBSOLETE maps
if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
locItr.remove();
continue;
} uniqueHosts.add(host);
scheduledCopies.add(loc);
locItr.remove(); // remove from knownOutputs
numInFlight++; numScheduled++; break; //we have a map from this host
}
}
}
scheduledCopies.notifyAll();
} if (numScheduled > 0 || logNow) {
LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
" outputs (" + penaltyBox.size() +
" slow hosts and" + numDups + " dup hosts)");
} if (penaltyBox.size() > 0 && logNow) {
LOG.info("Penalized(slow) Hosts: ");
for (String host : penaltyBox.keySet()) {
LOG.info(host + " Will be considered after: " +
((penaltyBox.get(host) - currentTime)/1000) + " seconds.");
}
} // if we have no copies in flight and we can't schedule anything
// new, just wait for a bit
try {
if (numInFlight == 0 && numScheduled == 0) {
// we should indicate progress as we don't want TT to think
// we're stuck and kill us
reporter.progress();
Thread.sleep(5000);
}
} catch (InterruptedException e) { } // IGNORE while (numInFlight > 0 && mergeThrowable == null) {
LOG.debug(reduceTask.getTaskID() + " numInFlight = " +
numInFlight);
//the call to getCopyResult will either
//1) return immediately with a null or a valid CopyResult object,
// or
//2) if the numInFlight is above maxInFlight, return with a
// CopyResult object after getting a notification from a
// fetcher thread,
//So, when getCopyResult returns null, we can be sure that
//we aren't busy enough and we should go and get more mapcompletion
//events from the tasktracker
CopyResult cr = getCopyResult(numInFlight); if (cr == null) {
break;
} if (cr.getSuccess()) { // a successful copy
numCopied++;
lastProgressTime = System.currentTimeMillis();
reduceShuffleBytes.increment(cr.getSize()); long secsSinceStart =
(System.currentTimeMillis()-startTime)/1000+1;
float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024);
float transferRate = mbs/secsSinceStart; copyPhase.startNextPhase();
copyPhase.setStatus("copy (" + numCopied + " of " + numMaps
+ " at " +
mbpsFormat.format(transferRate) + " MB/s)"); // Note successful fetch for this mapId to invalidate
// (possibly) old fetch-failures
fetchFailedMaps.remove(cr.getLocation().getTaskId());
} else if (cr.isObsolete()) {
//ignore
LOG.info(reduceTask.getTaskID() +
" Ignoring obsolete copy result for Map Task: " +
cr.getLocation().getTaskAttemptId() + " from host: " +
cr.getHost());
} else {
retryFetches.add(cr.getLocation()); // note the failed-fetch
TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
TaskID mapId = cr.getLocation().getTaskId(); totalFailures++;
Integer noFailedFetches =
mapTaskToFailedFetchesMap.get(mapTaskId);
noFailedFetches =
(noFailedFetches == null) ? 1 : (noFailedFetches + 1);
mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
LOG.info("Task " + getTaskID() + ": Failed fetch #" +
noFailedFetches + " from " + mapTaskId); // did the fetch fail too many times?
// using a hybrid technique for notifying the jobtracker.
// a. the first notification is sent after max-retries
// b. subsequent notifications are sent after 2 retries.
if ((noFailedFetches >= maxFetchRetriesPerMap)
&& ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
synchronized (ReduceTask.this) {
taskStatus.addFetchFailedMap(mapTaskId);
LOG.info("Failed to fetch map-output from " + mapTaskId +
" even after MAX_FETCH_RETRIES_PER_MAP retries... "
+ " reporting to the JobTracker");
}
}
// note unique failed-fetch maps
if (noFailedFetches == maxFetchRetriesPerMap) {
fetchFailedMaps.add(mapId); // did we have too many unique failed-fetch maps?
// and did we fail on too many fetch attempts?
// and did we progress enough
// or did we wait for too long without any progress? // check if the reducer is healthy
boolean reducerHealthy =
(((float)totalFailures / (totalFailures + numCopied))
< MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT); // check if the reducer has progressed enough
boolean reducerProgressedEnough =
(((float)numCopied / numMaps)
>= MIN_REQUIRED_PROGRESS_PERCENT); // check if the reducer is stalled for a long time
// duration for which the reducer is stalled
int stallDuration =
(int)(System.currentTimeMillis() - lastProgressTime);
// duration for which the reducer ran with progress
int shuffleProgressDuration =
(int)(lastProgressTime - startTime);
// min time the reducer should run without getting killed
int minShuffleRunDuration =
(shuffleProgressDuration > maxMapRuntime)
? shuffleProgressDuration
: maxMapRuntime;
boolean reducerStalled =
(((float)stallDuration / minShuffleRunDuration)
>= MAX_ALLOWED_STALL_TIME_PERCENT); // kill if not healthy and has insufficient progress
if ((fetchFailedMaps.size() >= maxFailedUniqueFetches ||
fetchFailedMaps.size() == (numMaps - copiedMapOutputs.size()))
&& !reducerHealthy
&& (!reducerProgressedEnough || reducerStalled)) {
LOG.fatal("Shuffle failed with too many fetch failures " +
"and insufficient progress!" +
"Killing task " + getTaskID() + ".");
umbilical.shuffleError(getTaskID(),
"Exceeded MAX_FAILED_UNIQUE_FETCHES;"
+ " bailing-out.");
}
} // back off exponentially until num_retries <= max_retries
// back off by max_backoff/2 on subsequent failed attempts
currentTime = System.currentTimeMillis();
int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
? BACKOFF_INIT
* (1 << (noFailedFetches - 1))
: (this.maxBackoff * 1000 / 2);
penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
LOG.warn(reduceTask.getTaskID() + " adding host " +
cr.getHost() + " to penalty box, next contact in " +
(currentBackOff/1000) + " seconds");
}
uniqueHosts.remove(cr.getHost());
numInFlight--;
}
} // all done, inform the copiers to exit
exitGetMapEvents= true;
try {
getMapEventsThread.join();
LOG.info("getMapsEventsThread joined.");
} catch (Throwable t) {
LOG.info("getMapsEventsThread threw an exception: " +
StringUtils.stringifyException(t));
} synchronized (copiers) {
synchronized (scheduledCopies) {
for (MapOutputCopier copier : copiers) {
copier.interrupt();
}
copiers.clear();
}
} // copiers are done, exit and notify the waiting merge threads
synchronized (mapOutputFilesOnDisk) {
exitLocalFSMerge = true;
mapOutputFilesOnDisk.notify();
} ramManager.close(); //Do a merge of in-memory files (if there are any)
if (mergeThrowable == null) {
try {
// Wait for the on-disk merge to complete
localFSMergerThread.join();
LOG.info("Interleaved on-disk merge complete: " +
mapOutputFilesOnDisk.size() + " files left."); //wait for an ongoing merge (if it is in flight) to complete
inMemFSMergeThread.join();
LOG.info("In-memory merge complete: " +
mapOutputsFilesInMemory.size() + " files left.");
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskID() +
" Final merge of the inmemory files threw an exception: " +
StringUtils.stringifyException(t));
// check if the last merge generated an error
if (mergeThrowable != null) {
mergeThrowable = t;
}
return false;
}
}
return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
}

fetchOutputs

4. MapOutputCopier线程的run方法。从scheduledCopies(List<MapOutputLocation>)中取出对象来调用copyOutput方法执行拷贝。通过http协议,把map的输出从远端服务器拷贝的本地,如果可以放在内存中,则存储在内存中调用,否则保存在本地文件。

public void run() {
while (true) {
MapOutputLocation loc = null;
long size = -1;
synchronized (scheduledCopies) {
while (scheduledCopies.isEmpty()) {
scheduledCopies.wait();
}
loc = scheduledCopies.remove(0);
} start(loc);
size = copyOutput(loc); if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
} }

5.MapOutputCopier线程的copyOutput方法。map的输出从远端map所在的tasktracker拷贝到reducer任务所在的tasktracker。

private long copyOutput(MapOutputLocation loc
) throws IOException, InterruptedException {
// 从拷贝的记录中检查是否已经拷贝完成。
if (copiedMapOutputs.contains(loc.getTaskId()) ||
obsoleteMapIds.contains(loc.getTaskAttemptId())) {
return CopyResult.OBSOLETE;
}
TaskAttemptID reduceId = reduceTask.getTaskID();
Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
reduceId.getJobID().toString(),
reduceId.toString())
+ "/map_" +
loc.getTaskId().getId() + ".out"); //一个拷贝map输出的临时文件。
Path tmpMapOutput = new Path(filename+"-"+id); //拷贝map输出。
MapOutput mapOutput = getMapOutput(loc, tmpMapOutput);
if (mapOutput == null) {
throw new IOException("Failed to fetch map-output for " +
loc.getTaskAttemptId() + " from " +
loc.getHost());
}
// The size of the map-output
long bytes = mapOutput.compressedSize; synchronized (ReduceTask.this) {
if (copiedMapOutputs.contains(loc.getTaskId())) {
mapOutput.discard();
return CopyResult.OBSOLETE;
}
// Note that we successfully copied the map-output
noteCopiedMapOutput(loc.getTaskId());
return bytes;
} // 处理map的输出,如果是存储在内存中则添加到(Collections.synchronizedList(new LinkedList<MapOutput>)类型的结合mapOutputsFilesInMemory中,否则如果存储在临时文件中,则冲明明临时文件为正式的输出文件。
if (mapOutput.inMemory) {
// Save it in the synchronized list of map-outputs
mapOutputsFilesInMemory.add(mapOutput);
} else { tmpMapOutput = mapOutput.file;
filename = new Path(tmpMapOutput.getParent(), filename.getName());
if (!localFileSys.rename(tmpMapOutput, filename)) {
localFileSys.delete(tmpMapOutput, true);
bytes = -1;
throw new IOException("Failed to rename map output " +
tmpMapOutput + " to " + filename);
} synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
}
} // Note that we successfully copied the map-output
noteCopiedMapOutput(loc.getTaskId());
} return bytes;
}

5.ReduceCopier.MapOutputCopier的getMapOutput方法,真正执行拷贝动作的方法,通过http把远端服务器上map的输出拷贝到本地。

private MapOutput getMapOutput(MapOutputLocation mapOutputLoc,
Path filename, int reduce)
throws IOException, InterruptedException {
// 根据远端服务器地址构建连接。
URLConnection connection =
mapOutputLoc.getOutputLocation().openConnection();
InputStream input = getInputStream(connection, STALLED_COPY_TIMEOUT,
DEFAULT_READ_TIMEOUT); // 从输出的http header中得到mapid
TaskAttemptID mapId = null;
mapId = TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK)); TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
if (!mapId.equals(expectedMapId)) {
LOG.warn("data from wrong map:" + mapId +
" arrived to reduce task " + reduce +
", where as expected map output should be from " + expectedMapId);
return null;
} long decompressedLength =
Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
long compressedLength =
Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH)); if (compressedLength < 0 || decompressedLength < 0) {
LOG.warn(getName() + " invalid lengths in map output header: id: " +
mapId + " compressed len: " + compressedLength +
", decompressed len: " + decompressedLength);
return null;
}
int forReduce =
(int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK)); if (forReduce != reduce) {
LOG.warn("data for the wrong reduce: " + forReduce +
" with compressed len: " + compressedLength +
", decompressed len: " + decompressedLength +
" arrived to reduce task " + reduce);
return null;
}
LOG.info("header: " + mapId + ", compressed len: " + compressedLength +
", decompressed len: " + decompressedLength); // 检查map的输出大小是否能在memory里存储下,已决定是在内存中shuffle还是在磁盘上shuffle。并决定最终生成的MapOutput对象调用不同的构造函数,其inMemory属性页不同。
boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); // Shuffle
MapOutput mapOutput = null;
if (shuffleInMemory) {
LOG.info("Shuffling " + decompressedLength + " bytes (" +
compressedLength + " raw bytes) " +
"into RAM from " + mapOutputLoc.getTaskAttemptId()); mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
(int)decompressedLength,
(int)compressedLength);
} else {
LOG.info("Shuffling " + decompressedLength + " bytes (" +
compressedLength + " raw bytes) " +
"into Local-FS from " + mapOutputLoc.getTaskAttemptId()); mapOutput = shuffleToDisk(mapOutputLoc, input, filename,
compressedLength);
} return mapOutput;
}

6.ReduceTask.ReduceCopier.MapOutputCopier的shuffleInMemory方法。根据上一方法当map的输出可以在内存中存储时会调用该方法。

private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
URLConnection connection,
InputStream input,
int mapOutputLength,
int compressedLength)
throws IOException, InterruptedException { //checksum 输入流,读Mpareduce中间文件IFile.
IFileInputStream checksumIn =
new IFileInputStream(input,compressedLength); input = checksumIn; // 如果加密,则根据codec来构建一个解密的输入流。
if (codec != null) {
decompressor.reset();
input = codec.createInputStream(input, decompressor);
} //把map的输出拷贝到内存的buffer中。
byte[] shuffleData = new byte[mapOutputLength];
MapOutput mapOutput =
new MapOutput(mapOutputLoc.getTaskId(),
mapOutputLoc.getTaskAttemptId(), shuffleData, compressedLength); int bytesRead = 0;
try {
int n = input.read(shuffleData, 0, shuffleData.length);
while (n > 0) {
bytesRead += n;
shuffleClientMetrics.inputBytes(n); // indicate we're making progress
reporter.progress();
n = input.read(shuffleData, bytesRead,
(shuffleData.length-bytesRead));
} LOG.info("Read " + bytesRead + " bytes from map-output for " +
mapOutputLoc.getTaskAttemptId()); input.close();
} catch (IOException ioe) {
LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(),
ioe); // Inform the ram-manager
ramManager.closeInMemoryFile(mapOutputLength);
ramManager.unreserve(mapOutputLength); // Discard the map-output
try {
mapOutput.discard();
} catch (IOException ignored) {
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ignored);
}
mapOutput = null; // Close the streams
IOUtils.cleanup(LOG, input); // Re-throw
throw ioe;
} // Close the in-memory file
ramManager.closeInMemoryFile(mapOutputLength); // Sanity check
if (bytesRead != mapOutputLength) {
// Inform the ram-manager
ramManager.unreserve(mapOutputLength); // Discard the map-output
try {
mapOutput.discard();
} catch (IOException ignored) {
// IGNORED because we are cleaning up
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ignored);
}
mapOutput = null; throw new IOException("Incomplete map output received for " +
mapOutputLoc.getTaskAttemptId() + " from " +
mapOutputLoc.getOutputLocation() + " (" +
bytesRead + " instead of " +
mapOutputLength + ")"
);
} // TODO: Remove this after a 'fix' for HADOOP-3647
if (mapOutputLength > 0) {
DataInputBuffer dib = new DataInputBuffer();
dib.reset(shuffleData, 0, shuffleData.length);
LOG.info("Rec #1 from " + mapOutputLoc.getTaskAttemptId() + " -> (" +
WritableUtils.readVInt(dib) + ", " +
WritableUtils.readVInt(dib) + ") from " +
mapOutputLoc.getHost());
} return mapOutput;
}

7.ReduceTask.ReduceCopier.MapOutputCopier的shuffleToDisk 方法把map输出拷贝到本地磁盘。当map的输出不能再内存中存储时,调用该方法。

private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
InputStream input,
Path filename,
long mapOutputLength)
throws IOException {
// Find out a suitable location for the output on local-filesystem
Path localFilename =
lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(),
mapOutputLength, conf); MapOutput mapOutput =
new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(),
conf, localFileSys.makeQualified(localFilename),
mapOutputLength); // Copy data to local-disk
OutputStream output = null;
long bytesRead = 0;
try {
output = rfs.create(localFilename); byte[] buf = new byte[64 * 1024];
int n = input.read(buf, 0, buf.length);
while (n > 0) {
bytesRead += n;
shuffleClientMetrics.inputBytes(n);
output.write(buf, 0, n); // indicate we're making progress
reporter.progress();
n = input.read(buf, 0, buf.length);
} LOG.info("Read " + bytesRead + " bytes from map-output for " +
mapOutputLoc.getTaskAttemptId()); output.close();
input.close();
} catch (IOException ioe) {
LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(),
ioe); // Discard the map-output
try {
mapOutput.discard();
} catch (IOException ignored) {
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ignored);
}
mapOutput = null; // Close the streams
IOUtils.cleanup(LOG, input, output); // Re-throw
throw ioe;
} // Sanity check
if (bytesRead != mapOutputLength) {
try {
mapOutput.discard();
} catch (Exception ioe) {
// IGNORED because we are cleaning up
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ioe);
} catch (Throwable t) {
String msg = getTaskID() + " : Failed in shuffle to disk :"
+ StringUtils.stringifyException(t);
reportFatalError(getTaskID(), t, msg);
}
mapOutput = null; throw new IOException("Incomplete map output received for " +
mapOutputLoc.getTaskAttemptId() + " from " +
mapOutputLoc.getOutputLocation() + " (" +
bytesRead + " instead of " +
mapOutputLength + ")"
);
} return mapOutput; }

8.LocalFSMerger线程的run方法。Merge map输出的本地拷贝。

public void run() {
try {
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
while(!exitLocalFSMerge){
// TreeSet<FileStatus>(mapOutputFileComparator)中存储了mapout的本地文件集合。
synchronized (mapOutputFilesOnDisk) {
List<Path> mapFiles = new ArrayList<Path>();
long approxOutputSize = 0;
int bytesPerSum =
reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
LOG.info(reduceTask.getTaskID() + "We have " +
mapOutputFilesOnDisk.size() + " map outputs on disk. " +
"Triggering merge of " + ioSortFactor + " files");
// 1. Prepare the list of files to be merged. This list is prepared
// using a list of map output files on disk. Currently we merge
// io.sort.factor files into 1.
//1. io.sort.factor构造List<Path> mapFiles,准备合并。 synchronized (mapOutputFilesOnDisk) {
for (int i = 0; i < ioSortFactor; ++i) {
FileStatus filestatus = mapOutputFilesOnDisk.first();
mapOutputFilesOnDisk.remove(filestatus);
mapFiles.add(filestatus.getPath());
approxOutputSize += filestatus.getLen();
}
} // add the checksum length
approxOutputSize += ChecksumFileSystem
.getChecksumLength(approxOutputSize,
bytesPerSum); // 2. 对list中的文件进行合并。
Path outputPath =
lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(),
approxOutputSize, conf)
.suffix(".merged");
Writer writer =
new Writer(conf,rfs, outputPath,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator iter = null;
Path tmpDir = new Path(reduceTask.getTaskID().toString());
try {
iter = Merger.merge(conf, rfs,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec, mapFiles.toArray(new Path[mapFiles.size()]),
true, ioSortFactor, tmpDir,
conf.getOutputKeyComparator(), reporter,
spilledRecordsCounter, null); Merger.writeFile(iter, writer, reporter, conf);
writer.close();
} catch (Exception e) {
localFileSys.delete(outputPath, true);
throw new IOException (StringUtils.stringifyException(e));
} synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(localFileSys.getFileStatus(outputPath));
}
LOG.info(reduceTask.getTaskID() +
" Finished merging " + mapFiles.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFileSys.getFileStatus(outputPath).getLen());
}
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskID()
+ " Merging of the local FS files threw an exception: "
+ StringUtils.stringifyException(t));
if (mergeThrowable == null) {
mergeThrowable = t;
}
}
}
}

9.InMemFSMergeThread线程的run方法。

    public void run() {
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
try {
boolean exit = false;
do {
exit = ramManager.waitForDataToMerge();
if (!exit) {
doInMemMerge();
}
} while (!exit);
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskID() +
" Merge of the inmemory files threw an exception: "
+ StringUtils.stringifyException(t));
ReduceCopier.this.mergeThrowable = t;
}
}

10. InMemFSMergeThread线程的doInMemMerge方法,

private void doInMemMerge() throws IOException{
if (mapOutputsFilesInMemory.size() == 0) {
return;
} TaskID mapId = mapOutputsFilesInMemory.get(0).mapId; List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K,V>>();
long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size(); Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
reduceTask.getTaskID(), mergeOutputSize); Writer writer =
new Writer(conf, rfs, outputPath,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec, null); RawKeyValueIterator rIter = null;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments..."); rIter = Merger.merge(conf, rfs,
(Class<K>)conf.getMapOutputKeyClass(),
(Class<V>)conf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceTask.getTaskID().toString()),
conf.getOutputKeyComparator(), reporter,
spilledRecordsCounter, null); if (combinerRunner == null) {
Merger.writeFile(rIter, writer, reporter, conf);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(rIter, combineCollector);
}
writer.close(); LOG.info(reduceTask.getTaskID() +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFileSys.getFileStatus(outputPath).getLen());
} catch (Exception e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFileSys.delete(outputPath, true);
throw (IOException)new IOException
("Intermediate merge failed").initCause(e);
} // Note the output of the merge
FileStatus status = localFileSys.getFileStatus(outputPath);
synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(status);
}
}
}

11.ReduceCopier.GetMapEventsThread线程的run方法。通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,即MapTask输出的地址,构造URL,加入到mapLocations,供copier线程获取。

public void run() {

        LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());

        do {
try {
int numNewMaps = getMapCompletionEvents();
if (numNewMaps > 0) {
LOG.info(reduceTask.getTaskID() + ": " +
"Got " + numNewMaps + " new map-outputs");
}
Thread.sleep(SLEEP_TIME);
}
catch (InterruptedException e) {
LOG.warn(reduceTask.getTaskID() +
" GetMapEventsThread returning after an " +
" interrupted exception");
return;
}
catch (Throwable t) {
LOG.warn(reduceTask.getTaskID() +
" GetMapEventsThread Ignoring exception : " +
StringUtils.stringifyException(t));
}
} while (!exitGetMapEvents); LOG.info("GetMapEventsThread exiting"); }

12.ReduceCopier.GetMapEventsThread线程的getMapCompletionEvents方法。通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,构造URL,加入到mapLocations。

    private int getMapCompletionEvents() throws IOException {

        int numNewMaps = 0;

        //RPC调用Tasktracker的getMapCompletionEvents方法,获得MapTaskCompletionEventsUpdate,进而获得TaskCompletionEvents
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents(reduceTask.getJobID(),
fromEventId.get(),
MAX_EVENTS_TO_FETCH,
reduceTask.getTaskID());
TaskCompletionEvent events[] = update.getMapTaskCompletionEvents(); // Check if the reset is required.
// Since there is no ordering of the task completion events at the
// reducer, the only option to sync with the new jobtracker is to reset
// the events index
if (update.shouldReset()) {
fromEventId.set(0);
obsoleteMapIds.clear(); // clear the obsolete map
mapLocations.clear(); // clear the map locations mapping
} // Update the last seen event ID
fromEventId.set(fromEventId.get() + events.length); // Process the TaskCompletionEvents:
// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
// fetching from those maps.
// 3. Remove TIPFAILED maps from neededOutputs since we don't need their
// outputs at all.
//对每个完成的Event,获取maptask所在的服务器地址,构造URL,加入到mapLocations,供copier线程获取。
for (TaskCompletionEvent event : events) {
switch (event.getTaskStatus()) {
case SUCCEEDED:
{
URI u = URI.create(event.getTaskTrackerHttp());
String host = u.getHost();
TaskAttemptID taskId = event.getTaskAttemptId();
int duration = event.getTaskRunTime();
if (duration > maxMapRuntime) {
maxMapRuntime = duration;
// adjust max-fetch-retries based on max-map-run-time
maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
}
URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
"/mapOutput?job=" + taskId.getJobID() +
"&map=" + taskId +
"&reduce=" + getPartition());
List<MapOutputLocation> loc = mapLocations.get(host);
if (loc == null) {
loc = Collections.synchronizedList
(new LinkedList<MapOutputLocation>());
mapLocations.put(host, loc);
}
loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
numNewMaps ++;
}
break;
case FAILED:
case KILLED:
case OBSOLETE:
{
obsoleteMapIds.add(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");
}
break;
case TIPFAILED:
{
copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");
}
break;
}
}
return numNewMaps;
}
}
}

13.ReduceTask.ReduceCopier的createKVIterator方法,从拷贝到的map输出创建RawKeyValueIterator,作为reduce的输入。

private RawKeyValueIterator createKVIterator(
JobConf job, FileSystem fs, Reporter reporter) throws IOException { // merge config params
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
boolean keepInputs = job.getKeepFailedTaskFiles();
final Path tmpDir = new Path(getTaskID().toString());
final RawComparator<K> comparator =
(RawComparator<K>)job.getOutputKeyComparator(); // segments required to vacate memory
List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
long inMemToDiskBytes = 0;
if (mapOutputsFilesInMemory.size() > 0) {
TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
inMemToDiskBytes = createInMemorySegments(memDiskSegments,
maxInMemReduce);
final int numMemDiskSegments = memDiskSegments.size();
if (numMemDiskSegments > 0 &&
ioSortFactor > mapOutputFilesOnDisk.size()) {
// must spill to disk, but can't retain in-mem for intermediate merge
final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
reduceTask.getTaskID(), inMemToDiskBytes);
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null);
final Writer writer = new Writer(job, fs, outputPath,
keyClass, valueClass, codec, null);
try {
Merger.writeFile(rIter, writer, reporter, job);
addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
} catch (Exception e) {
if (null != outputPath) {
fs.delete(outputPath, true);
}
throw new IOException("Final merge failed", e);
} finally {
if (null != writer) {
writer.close();
}
}
LOG.info("Merged " + numMemDiskSegments + " segments, " +
inMemToDiskBytes + " bytes to disk to satisfy " +
"reduce memory limit");
inMemToDiskBytes = 0;
memDiskSegments.clear();
} else if (inMemToDiskBytes != 0) {
LOG.info("Keeping " + numMemDiskSegments + " segments, " +
inMemToDiskBytes + " bytes in memory for " +
"intermediate, on-disk merge");
}
} // segments on disk
List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
long onDiskBytes = inMemToDiskBytes;
Path[] onDisk = getMapFiles(fs, false);
for (Path file : onDisk) {
onDiskBytes += fs.getFileStatus(file).getLen();
diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs));
}
LOG.info("Merging " + onDisk.length + " files, " +
onDiskBytes + " bytes from disk");
Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
public int compare(Segment<K, V> o1, Segment<K, V> o2) {
if (o1.getLength() == o2.getLength()) {
return 0;
}
return o1.getLength() < o2.getLength() ? -1 : 1;
}
}); // build final list of segments from merged backed by disk + in-mem
List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
long inMemBytes = createInMemorySegments(finalSegments, 0);
LOG.info("Merging " + finalSegments.size() + " segments, " +
inMemBytes + " bytes from memory into reduce");
if (0 != onDiskBytes) {
final int numInMemSegments = memDiskSegments.size();
diskSegments.addAll(0, memDiskSegments);
memDiskSegments.clear();
RawKeyValueIterator diskMerge = Merger.merge(
job, fs, keyClass, valueClass, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
reporter, false, spilledRecordsCounter, null);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
}
finalSegments.add(new Segment<K,V>(
new RawKVIteratorReader(diskMerge, onDiskBytes), true));
}
return Merger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
comparator, reporter, spilledRecordsCounter, null);
}

14.ReduceTask的runNewReducer方法。根据配置构造reducer以及其运行的上下文,调用reducer的reduce方法。

@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
//1. 构造TaskContext
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
//2. 根据配置的Reducer类构造一个Reducer实例
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
//3. 构造RecordWriter
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
(org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
outputFormat.getRecordWriter(taskContext);
job.setBoolean("mapred.skip.on", isSkipping()); //4. 构造Context,是Reducer运行的上下文
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
rIter, reduceInputValueCounter,
output, committer,
reporter, comparator, keyClass,
valueClass);
reducer.run(reducerContext);
output.close(reducerContext);
}

15.抽象类Reducer的run方法。从上下文中取出一个key和该key对应的Value集合(Iterable<VALUEIN>类型),调用reducer的reduce方法进行处理。

public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
}
cleanup(context);
}

16.Reducer类的reduce,是用户一般会覆盖来执行reduce处理逻辑的方法。

@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}

完。

为了转载内容的一致性、可追溯性和保证及时更新纠错,转载时请注明来自:http://www.cnblogs.com/douba/p/hadoop_mapreduce_tasktracker_child_reduce.html。谢谢!