MapReduce Job作业的提交过程可以分为本地提交与集群提交,与org.apache.hadoop.mapred.LocalJobRunner、org.apache.hadoop.mapred.YARNRunner这两个类相关。在本篇文章中,将剖析Job作业本地的提交过程。用到了JVM的远程调试,具体操作请见这篇 “eclipse中远程调试JVM(以启动namenode进程为例)”
所写的MapReduce程序、debug的操作步骤以及本文中涉及到的内容均以整理好打包上传,地址。
源码解析
org.apache.hadoop.mapreduce.Job
waitForCompletion()方法
/**
* 提交作业给集群并等待完成
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
//状态的提取
if (state == JobState.DEFINE) {
//调用自己的提交方法
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
---------------------------------------------------------------------------------------
submit()方法
/**
* 提交作业给集群立即返回
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
//确认状态
ensureState(JobState.DEFINE);
//设置新型API
setUseNewAPI();
//连接,连接到集群
connect();
//创建了一个作业提交器,从集群中取出文件系统和Client,得到文件系统和客户端的值;通过集群的文件系统和集群的客户端,从而得到作业提交器
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
//匿名内部类new PrivilegedExceptionAction<JobStatus>()
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
//调用了JobSubmitter的submitJobInternal()方法(内部的提交方法) 最重要一步
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
---------------------------------------------------------------------------------------
connect()方法
//使用了匿名内部类对象new PrivilegedExceptionAction<Cluster>()
//目的就是创建集群对象,通过匿名内部类创建出一个集群对象,返回给cluster(cluster是Job类的成员变量)
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
//通过getConfiguration()方法将配置传给集群
return new Cluster(getConfiguration());
}
});
}
}
org.apache.hadoop.mapreduce.JobSubmitter
JobSubmitter.submitJobInternal(){
...
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs 检查空间,如果输出目录存在就会抛出异常
checkSpecs(job);
//拿到job的配置
Configuration conf = job.getConfiguration();
//把conf放入分布式缓存(可以先不管)
addMRFrameworkToDistributedCache(conf);
//进入作业的阶段性区域(hdfs的临时目录)
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//取得本地客户IP(作业提交客户端的ip地址,即在哪一台机子上进行提交)
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
//得到本机的提交地址
submitHostAddress = ip.getHostAddress();
//得到本机的主机名字
submitHostName = ip.getHostName();
//获取之后,在配置文件中进行submitHostName和submitHostAddress的设置
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
//创建一个新的作业ID(每个Job作业都有一个唯一的ID)
JobID jobId = submitClient.getNewJobID();
//对JobID进行设置
job.setJobID(jobId);
//通过jobid和临时目录构造出一个新的路径
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
//作业状态
JobStatus status = null;
try{
//设置用户名称
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
//设置过滤器初始化
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
//设置作业目录
//submitJobDir的值发生了改变,值为file:/tmp/hadoop-陶/mapred/staging/ì?635285396/.staging/job_local635285396_0001
//说明在./.staging/目录下还要创造一个子文件夹来存放作业
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
...
//接下来的一长串操作是进行安全性控制,进行安全认证(hadoop可以加入安全认证的插件)
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}
//拷贝并配置文件
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job 为作业创建切片(map任务的个数取决于切片的个数)
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
//执行完writeSplits()方法(该方法就是创建切片的过程)之后,在./.staging/job_local635285396_0001/目录下生成了四个文件
//.job.split.crc、.job.splitmetainfo.crc校验和文件
//job.split切片文件 job.splitmetainfo切片元信息文件
//设置MapReduce的map任务数
conf.setInt(MRJobConfig.NUM_MAPS, maps);
//打印map信息
LOG.info("number of splits:" + maps);
//作业提交到作业队列中去,由作业队列进行管理
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
...
//写入job file到提交目录中(提交job.xml到提交目录)
//job.xml文件里包含了四个配置文件里面所设置的所有参数
writeConf(conf, submitJobFile);
...
//submitClient=LocalJobRunner
//LocalJobRunner.submitJob() 通过执行器提交作业
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
}
}
}
...
}
---------------------------------------------------------------------------------------
JobSubmitter{
...
//使用命令行参数设置conf信息
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
throws IOException {
JobResourceUploader rUploader = new JobResourceUploader(jtFs);
rUploader.uploadFiles(job, jobSubmitDir);
// Get the working directory. If not set, sets it to filesystem working dir
// This code has been added so that working directory reset before running
// the job. This is necessary for backward compatibility as other systems
// might use the public API JobConf#setWorkingDirectory to reset the working
// directory.
job.getWorkingDirectory();
}
...
}
org.apache.hadoop.mapred.LocalJobRunner
LocalJobRunner{
...
public org.apache.hadoop.mapreduce.JobStatus submitJob(
org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
Credentials credentials) throws IOException {
//创建LocalJobRunner.job的内部类对象
Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
job.job.setCredentials(credentials);
return job.status;
}
...
}
----------------------------------------------------------------------------------------
LocalJobRunner.Job extends Thread implements TaskUmbilicalProtocol{
...
public Job(JobID jobid, String jobSubmitDir) throws IOException {
//通过临时目录下的job.xml创建JobConf对象
this.systemJobDir = new Path(jobSubmitDir);
this.systemJobFile = new Path(systemJobDir, "job.xml");
this.id = jobid;
JobConf conf = new JobConf(systemJobFile);
//得到本地文件系统
this.localFs = FileSystem.getLocal(conf);
String user = UserGroupInformation.getCurrentUser().getShortUserName();
//得到本地目录
this.localJobDir = localFs.makeQualified(new Path(
new Path(conf.getLocalPath(jobDir), user), jobid.toString()));
//得到本地文件
this.localJobFile = new Path(this.localJobDir, id + ".xml");
// Manage the distributed cache. If there are files to be copied,
// this will trigger localFile to be re-written again.
localDistributedCacheManager = new LocalDistributedCacheManager();
localDistributedCacheManager.setup(conf);
//通过流写入一个配置文件
// Write out configuration file. Instead of copying it from
// systemJobFile, we re-write it, since setup(), above, may have
// updated it.
OutputStream out = localFs.create(localJobFile);
try {
//写入流。 写入job_local2086240933_0001.xml
conf.writeXml(out);
} finally {
//写入成功 生成job_local2086240933_0001.xml.crc文件和job_local2086240933_0001.xml文件
out.close();
}
//上述一大段代码都是在准备目录、配置文件、配置空间,至此还没有执行任何的任务
//通过localJobFile创建一个文件
this.job = new JobConf(localJobFile);
...
//最终目的:启动线程
this.start();
...
@Override
public void run() {
//得到作业的ID
JobID jobId = profile.getJobID();
//拿到作业的上下文
JobContext jContext = new JobContextImpl(job, jobId);
org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
try {
outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
} catch (Exception e) {
LOG.info("Failed to createOutputCommitter", e);
return;
}
try {
//任务切片元信息,得到切片信息
TaskSplitMetaInfo[] taskSplitMetaInfos =
SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
//得到Reduce任务数
int numReduceTasks = job.getNumReduceTasks();
//安装作业 设置作业
outputCommitter.setupJob(jContext);
//设置作业进度
status.setSetupProgress(1.0f);
//输出文件
Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
//得到mappper对应的runnable个数(Runner.Job.MapTaskRunnable) org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable
List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
taskSplitMetaInfos, jobId, mapOutputFiles);
//初始化计数器
initCounters(mapRunnables.size(), numReduceTasks);
//创建一个map的线程池的执行器
ExecutorService mapService = createMapExecutor();
//开始运行map任务任务
//注意:mapreduce的运行过程中,使用了线程池的技术(放到队列当中,在将来的某个时刻进行执行)
runTasks(mapRunnables, mapService, "map");
try {
//如果reduce的个数大于0,再去执行reduce阶段
if (numReduceTasks > 0) {
//计算reduce对应的runnable个数
List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
jobId, mapOutputFiles);
ExecutorService reduceService = createReduceExecutor();
//开始运行reduce任务
runTasks(reduceRunnables, reduceService, "reduce");
}
} finally {
for (MapOutputFile output : mapOutputFiles.values()) {
output.removeAll();
}
}
// delete the temporary directory in output directory
outputCommitter.commitJob(jContext);
status.setCleanupProgress(1.0f);
if (killed) {
this.status.setRunState(JobStatus.KILLED);
} else {
this.status.setRunState(JobStatus.SUCCEEDED);
}
JobEndNotifier.localRunnerNotification(job, status);
} catch (Throwable t) {
try {
outputCommitter.abortJob(jContext,
org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
} catch (IOException ioe) {
LOG.info("Error cleaning up job:" + id);
}
status.setCleanupProgress(1.0f);
if (killed) {
this.status.setRunState(JobStatus.KILLED);
} else {
this.status.setRunState(JobStatus.FAILED);
}
LOG.warn(id, t);
JobEndNotifier.localRunnerNotification(job, status);
} finally {
try {
fs.delete(systemJobFile.getParent(), true); // delete submit dir
localFs.delete(localJobFile, true); // delete local copy
// Cleanup distributed cache
localDistributedCacheManager.close();
} catch (IOException e) {
LOG.warn("Error cleaning up "+id+": "+e);
}
}
}
...
/** Run a set of tasks and waits for them to complete. */
private void runTasks(List<RunnableWithThrowable> runnables,
ExecutorService service, String taskType) throws Exception {
// Start populating the executor with work units.
// They may begin running immediately (in other threads).
for (Runnable r : runnables) {
//进行提交 是一个线程池,执行map和reduce
service.submit(r);
}
...
}
...
MapTaskRunnable{
...
public void run() {
try {
//生成一个maptask的ID
TaskAttemptID mapId = new TaskAttemptID(new TaskID(
jobId, TaskType.MAP, taskId), 0);
LOG.info("Starting task: " + mapId);
//将mapID加入到mapIds这个集合里来
mapIds.add(mapId);
//作业文件(实质上是一个job.xml文件,可通过watch查看),mapid,任务id,切片信息 去构造一个MapTask类型的对象
MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,
info.getSplitIndex(), 1);
map.setUser(UserGroupInformation.getCurrentUser().
getShortUserName());
//设置目录
//map为MapTask类型的一个值,例如本次调试中所获取的值为:attempt_local335618588_0001_m_000000_0
//localConf()加载配置文件的信息
setupChildMapredLocalDirs(map, localConf);
//创建一个map输出文件
MapOutputFile mapOutput = new MROutputFiles();
//设置配置信息
mapOutput.setConf(localConf);
mapOutputFiles.put(mapId, mapOutput);
//localJobFile.toString()的值为file:/tmp/hadoop-陶/mapred/local/localRunner/ì?/job_local335618588_0001/job_local335618588_0001.xml
//实质上是一个job_local335618588_0001.xml文件
map.setJobFile(localJobFile.toString());
localConf.setUser(map.getUser());
map.localizeConfiguration(localConf);
map.setConf(localConf);
try {
map_tasks.getAndIncrement();
//launchMap()方法,进行启动map
myMetrics.launchMap(mapId);
map.run(localConf, Job.this);
myMetrics.completeMap(mapId);
} finally {
map_tasks.getAndDecrement();
}
LOG.info("Finishing task: " + mapId);
} catch (Throwable e) {
this.storedException = e;
}
}
...
}
}
...
ReduceTaskRunnable{
...
public void run() {
try {
TaskAttemptID reduceId = new TaskAttemptID(new TaskID(
jobId, TaskType.REDUCE, taskId), 0);
LOG.info("Starting task: " + reduceId);
ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
reduceId, taskId, mapIds.size(), 1);
reduce.setUser(UserGroupInformation.getCurrentUser().
getShortUserName());
setupChildMapredLocalDirs(reduce, localConf);
reduce.setLocalMapFiles(mapOutputFiles);
if (!Job.this.isInterrupted()) {
reduce.setJobFile(localJobFile.toString());
localConf.setUser(reduce.getUser());
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
try {
reduce_tasks.getAndIncrement();
// 进行启动reduce
myMetrics.launchReduce(reduce.getTaskID());
// 开始运行reduce任务
reduce.run(localConf, Job.this);
myMetrics.completeReduce(reduce.getTaskID());
} finally {
reduce_tasks.getAndDecrement();
}
LOG.info("Finishing task: " + reduceId);
...
}
}
}
...
}
...
}
org.apache.hadoop.mapred.MapTask
MapTask{
...
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
//判断是否是map任务
if (isMapTask()) {
// If there are no reducers then there won't be any sort. Hence the map
// phase will govern the entire attempt's progress.
//如果没有reduce就不做任何排序
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
mapPhase = getProgress().addPhase("map", 0.667f);
//进行排序
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
//启动一个汇报
TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewMapper();
//进行初始化工作
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
//判断是否是新型Api
if (useNewApi) {
//调用自己所写的mapper类
//作业信息,切片元信息,,报告
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
//调用自己所写的mapper类
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}
...
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
// 通过创建任务的上下文对象,获取类的对象
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper
// 创建一个mapper,通过反射的方式得到mapper(值为class com.zhaotao.hadoop.mr.MyMaxTempMapper)
// taskContext.getMapperClass()的值为class com.zhaotao.hadoop.mr.MyMaxTempMapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format
// 通过反射获取输入格式的对象
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split
// 重建输入的切片
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
// 判断是否有reduce
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
// 新建一个输出收集器(与自己重写的mapper方法种Context对象相关)===>等价于Context对象
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
// 将新建的输出收集器对象output放入MapContextImpl中,以此得到MapContext对象
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
// 对mapContext进行包装,得到定义的Context类型的mapperContext
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
try {
// 先做初始化
input.initialize(split, mapperContext);
//具体内容看Mapper类中的run()方法
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
...
}
org.apache.hadoop.mapreduce.Mapper
Mapper{
...
// map的执行过程中分为三个阶段
// a.安装阶段
// b.循环调用map()阶段 (只有该阶段是自己定义的) 在Java中,这是一个典型的回调机制(只有在重写方法的时候进行回调机制)
// c.清除阶段
// 传入了Context类型的参数
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
// 使用循环,不断调用map方法(实质上就是自己重写的map方法,对其不断的调用,用于处理文本行)
// 判断是否存在下一个Key - Value
while (context.nextKeyValue()) {
// 存在,取出Key值、Value值,并传入上下文context
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
// 最后对其进行清理
cleanup(context);
}
}
...
}
org.apache.hadoop.mapreduce.lib.map.WrappedMapper
WrappedMapper{
...
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// 返回Key值
return mapContext.nextKeyValue();
}
@Override
public Counter getCounter(Enum<?> counterName) {
// 返回Value值
return mapContext.getCounter(counterName);
}
...
}
org.apache.hadoop.mapred.ReduceTask
ReduceTask{
...
@Override
@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
// 判断是Map或者Reduce
if (isMapOrReduce()) {
// 添加一个拷贝阶段
copyPhase = getProgress().addPhase("copy");
// 增加一个排序阶段
sortPhase = getProgress().addPhase("sort");
// 增加一个reduce化简阶段
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
TaskReporter reporter = startReporter(umbilical);
// 判断是否使用了新的Reducer Api
boolean useNewApi = job.getUseNewReducer();
// 利用上述值进行初始化操作
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
// Initialize the codec
// 检查map的输出是否被压缩了
codec = initCodec();
// 一个迭代器
RawKeyValueIterator rIter = null;
// shuffle洗牌
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
Class<? extends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
// 正式开启reduce之前的shuffle过程
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
// shuffle完成
shuffleConsumerPlugin.init(shuffleContext);
// rIter是一个MergeQueue合并队列
rIter = shuffleConsumerPlugin.run();
// free up the data structures
mapOutputFilesOnDisk.clear();
// 排序已经完成
sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
// 判断reduce的Api是否为新版本
if (useNewApi) {
// 开始运行reduce任务
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
...
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
// wrap value iterator to report progress.
final RawKeyValueIterator rawIter = rIter;
rIter = new RawKeyValueIterator() {
public void close() throws IOException {
rawIter.close();
}
public DataInputBuffer getKey() throws IOException {
return rawIter.getKey();
}
public Progress getProgress() {
return rawIter.getProgress();
}
public DataInputBuffer getValue() throws IOException {
return rawIter.getValue();
}
public boolean next() throws IOException {
boolean ret = rawIter.next();
reporter.setProgress(rawIter.getProgress().getProgress());
return ret;
}
};
// make a task context so we can get the classes
// 创建一个任务上下文来获取class
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(), reporter);
// make a reducer
// 通过反射的方式获取reducer对象
// taskContext.getReducerClass()的值为com.zhaotao.hadoop.mr.MyMaxTempReducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
job.setBoolean("mapred.skip.on", isSkipping());
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
rIter, reduceInputKeyCounter,
reduceInputValueCounter,
trackedRW,
committer,
reporter, comparator, keyClass,
valueClass);
try {
// 进入并开始reduce的运行
reducer.run(reducerContext);
} finally {
trackedRW.close(reducerContext);
}
}
...
}
org.apache.hadoop.mapreduce.Reducer
Reducer{
...
// 分三阶段运行,与mapper的run()方法运行的一样
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
...
}
Job作业本地提交过程简略表示
job.submit()
--> JobSubmitter.submitJobInternal()
--> LocalJobRunner.submitJob(..)
--> 转换Job为LocalJobRunner.Job的对象(线程,并启动)
new LocalJobRunner(){
...
this.start();//启动job线程
}
--> LocalJobRunner.run()
1.创建mapRunnables集合 //map数量取决于切片的数量
2.runTasks(mapRunnable集合)
3.创建reduceRunnables集合 //reduce数量需要手动进行设置
4.runTasks(reduceRunnable集合)
--> runTasks(...)
for(Runnable r : runnables){
service.submit(r);
}
--> MapTaskRunnable.run() \ ReduceTaskRunnable.run()
{
MapTask task = new MapTask(); \ ReduceTask task = new ReduceTask();
task.run();
}
--> MapTask.run(){
MyMaxTempMapper.run(){
setup();
while(){
map(...);
}
cleanup();
}
}
ReduceTask.run(){
MyMaxTempMapper.run(){
setup();
while(){
reduce(...);
}
cleanup();
}
}