MapReduce调度与执行原理之作业初始化

时间:2022-09-25 08:19:32

前言 :本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教。本文不涉及Hadoop的架构设计,如有兴趣请参考相关书籍和文献。在梳 理过程中,我对一些感兴趣的源码也会逐行研究学习,以期强化基础。

作者
:Jaytalent

开始日期
:2013年9月9日

参考资料:【1】《Hadoop技术内幕--深入解析MapReduce架构设计与实现原理》董西成
                  【2】Hadoop 1.0.0 源码
                            【3】《Hadoop技术内幕--深入解析Hadoop Common和HDFS架构设计与实现原理》蔡斌 陈湘萍

上一篇文章中,作业准备提交到JobTracker了。本文关注作业在提交到JobTracker后且在执行前经历了哪些事情。
一个MapReduce作业的生命周期大体分为5个阶段
【1】:
1. 
作业提交与初始化
2. 任务调度与监控
3. 任务运行环境准备
4. 任务执行
5. 作业完成

一、作业提交与初始化


 JobClient.submitJobInternal方法中,最后一步就是将作业提交到JobTracker:
 status = jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials());

这个方法所属的接口在 上一篇文章中有所提及,其实现有两个:JobTracker和LocalRunner。LocalRunner是用于执行本地作业的,当Hadoop配置为本地模式时采用该类处理作业。我们关注JobTracker.submitJob方法。这里多说一句,在JobClient对象初始化时有如下代码:

 /**
* Connect to the default {@link JobTracker}.
* @param conf the job configuration.
* @throws IOException
*/
public void init(JobConf conf) throws IOException {
String tracker = conf.get("mapred.job.tracker", "local");
tasklogtimeout = conf.getInt(
TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
this.ugi = UserGroupInformation.getCurrentUser();
if ("local".equals(tracker)) {
conf.setNumMapTasks(1);
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
}
}

可以看到,tracker变量是从mapred.job.tracker配置获取值的,默认值为字符串“local”。因此,如果没有在配置文件配置这个值或者JobConf对象中没有添加配置文件(通常为mapred-site.xml)资源,jobSubmitClient就会使用LocalJobRunner进行初始化。
回到正题,
JobTracker.submitJob方法会做如下工作:

a. 首先创建JobInProgress对象。这个是个非常重要的对象,它维护了作业的生命周期,可以跟踪作业的运行状态和进度。
    // Create the JobInProgress, do not lock the JobTracker since
// we are about to copy job.xml from HDFS
JobInProgress job = null;
try {
job = new JobInProgress(this, this.conf, jobInfo, 0, ts);
} catch (Exception e) {
throw new IOException(e);
}

我们可以进入构造函数内部看看JobInProgress都有什么内容。

this.jobtracker = jobtracker;
this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);

JobStatus对象也比较关键,它维护了作业的一些状态信息如作业优先级,开始时间,map和reduce任务的进度等。
其他信息在后面分析JobTracker实现的时候再详述。

b. 检查用户的作业提交权限。Hadoop以队列为单位管理作业和资源。一个用户可以属于一个或多个队列,管理员可以配置用户在某个队列中的作业提交权限。
      // check if queue is RUNNING
String queue = job.getProfile().getQueueName();
if (!queueManager.isRunning(queue)) {
throw new IOException("Queue \"" + queue + "\" is not running");
}
try {
aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);
} catch (IOException ioe) {
LOG.warn("Access denied for user " + job.getJobConf().getUser()
+ ". Ignoring job " + jobId, ioe);
job.fail();
throw ioe;
}

c. 检查作业的内存使用量。用户在配置文件中可以配置Map任务和Reduce任务的内存使用量,而这些值不能超过管理员所配置的最大使用量,否则作业提交就会失败。

      // Check the job if it cannot run in the cluster because of invalid memory
// requirements.
try {
checkMemoryRequirements(job);
} catch (IOException ioe) {
throw ioe;
}

d. 调用调度器模块,对作业进行初始化。具体过程如下

     // Submit the job
JobStatus status;
status = addJob(jobId, job);

在addJob方法中,作业首先被加入到已经提交的作业列表中,然后通知JobTracker所有的监听器对象,当前作业被提交,并采取相应的行动。

    synchronized (jobs) {
synchronized (taskScheduler) {
jobs.put(job.getProfile().getJobID(), job);
for (JobInProgressListener listener : jobInProgressListeners) {
listener.jobAdded(job);
}
}
}

其中,任务调度器taskScheduler对象是在JobTracker构造时创建出来的。调度器对象和JobTracker对象是互相包含的关系。

    // Create the scheduler
Class<? extends TaskScheduler> schedulerClass
= conf.getClass("mapred.jobtracker.taskScheduler",
JobQueueTaskScheduler.class, TaskScheduler.class);
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);

可以看出,Hadoop任务调度器时可插拔的模块,调度器的类型通过配置文件获得,并使用反射机制实例化。用户可以通过继承TaskScheduler类实现自己的调度器(由于做研究需要,本人实现了一个简单的调度器,实现过程日后分享)。Hadoop默认的调度器为JobQueueTaskScheduler,调度策略为先进先出(FIFO)。

注意:JobTracker采用了典型的观察者模式。TaskScheduler为订阅者,JobTracker为发布者,二者通过作业监听器JobInProgressListener对象发生关系。当用户自定义了一个调度器后,同时要自定义监听器类。一个调度器对象可以包含若干个监听器,调度器在初始化时,会将其所有的监听器对象注册到JobTracker,订阅其发布的消息。以JobQueueTaskScheduler为例:
  public synchronized void start() throws IOException {
super.start();
taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
eagerTaskInitializationListener.start();
taskTrackerManager.addJobInProgressListener(
eagerTaskInitializationListener);
}

其中taskTrackerManager对象就是JobTracker,通过addJobInProgressListener方法注册监听器。这样,当有JobTracker发现有作业被提交、更新或删除时,就会通知订阅者TaskScheduler,并调用相应的回调函数,如上面提到的listener.jobAdded方法。更多调度器的细节请关注后续文章。

作业初始化的工作就是由上述的EagerTaskInitializationListener监听器对象实现的。在该监听器内部有一个作业初始化管理线程在运行,该线程访问一个初始化作业队列,取出一个作业,并新开一个作业初始化线程执行JobTracker.initJob方法。
初始化的过程主要是根据作业信息创建该作业的任务(Task)。作业的任务包括四种:
1. Setup Task。该任务进行一些简单工作,运行状态设置为setup。它在运行时会占用slot。Map和Reduce Setup任务各有一个。
    // create two setup tips, one map and one reduce.
setup = new TaskInProgress[2];
// setup map tip. This map doesn't use any split. Just assign an empty
// split.
setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks + 1, 1);
setup[0].setJobSetupTask();
// setup reduce tip.
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this, 1);
setup[1].setJobSetupTask();
2. Map Task。Map阶段处理数据的任务。其创建过程如下:
    maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getInputDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i, numSlotsPerMap);
}

TaskInProgrees维护任务运行时信息,与JobInProgress类似。

3. Reduce Task。Reduce阶段处理数据的任务。其创建过程如下:
    this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
reduces[i] = new TaskInProgress(jobId, jobFile,
numMapTasks, i,
jobtracker, conf, this, numSlotsPerReduce);
nonRunningReduces.add(reduces[i]);
}

用户可以在配置文件中指定Reduce任务的个数。

4. Cleanup Task。 作业完成后完成一些清理工作的任务。清理包括删除临时目录,设置状态等操作。
    // create cleanup two cleanup tips, one map and one reduce.
cleanup = new TaskInProgress[2];
// cleanup map tip. This map doesn't use any splits. Just assign an empty
// split.
TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks, 1);
cleanup[0].setJobCleanupTask();
// cleanup reduce tip.
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this, 1);
cleanup[1].setJobCleanupTask();
至此,作业初始化工作完成。接下来,调度器会根据当前可用的slot资源,从队列中选择一个作业,进而选择该作业的一个任务,放到空闲slot上执行。四种任务执行的顺序为Setup、Map、Reduce和Cleanup。由于Reduce任务的输入依赖于Map任务的输出,因此Reduce任务通常延后开始,否则将闲置reduce slot。可以配置档Map任务进度大于mapred.reduce.slowstart.completed.maps时,Reduce任务才开始,该值默认为5%。Map和Reduce任务这种依赖性在任务调度器设计中时常考虑。例如Facebook在提出FairScheduler的论文中就试图解决这个问题。
另外,关于为什么JobTracker将初始化工作交给调度器处理,文献【1】给出的理由是:
  • 作业初始化后会占用内存资源,如果有大量初始化作业在JobTracker等待调度就会占用不必要的资源。在交给调度器后,Hadoop按照一定策略选择性地初始化以节省内存资源。
  • 只有经过初始化的作业才能得到调度,因此将初始化工作嵌入调度器中比较合理。
有关任务调度器内容详见下一篇文章: MapReduce调度与执行原理之任务调度

MapReduce调度与执行原理之作业初始化的更多相关文章

  1. MapReduce调度与执行原理之作业提交

    前言 :本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教.本文不涉及Hadoop的架构设计,如有兴趣请参考相关 ...

  2. MapReduce调度与执行原理系列文章

    转自:http://blog.csdn.net/jaytalent?viewmode=contents MapReduce调度与执行原理系列文章 一.MapReduce调度与执行原理之作业提交 二.M ...

  3. MapReduce调度与执行原理之任务调度

    前言 :本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教.本文不涉及Hadoop的架构设计,如有兴趣请参考相关 ...

  4. MapReduce调度与执行原理之任务调度(续)

    前言 :本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教.本文不涉及Hadoop的架构设计,如有兴趣请参考相关 ...

  5. MapReduce on Yarn运行原理

    一.概念综述 MapReduce是一种可用于数据处理的编程模型(或计算模型),该模型可以比较简单,但想写出有用的程序却不太容易.MapReduce能将大型数据处理任务分解成很多单个的.可以在服务器集群 ...

  6. erlang虚拟机代码执行原理

     转载:http://blog.csdn.NET/mycwq/article/details/45653897 erlang是开源的,很多人都研究过源代码.但是,从erlang代码到c代码,这是个不小 ...

  7. springmvc执行原理及自定义mvc框架

    springmvc是spring的一部分,也是一个优秀的mvc框架,其执行原理如下: (1)浏览器提交请求经web容器(比如tomcat)转发到*调度器dispatcherServlet. (2)中 ...

  8. MapReduce调度器

    1. 先进先出(FIFO)调度器 先进先出调度器是Hadoop的默认调度器.就像这个名字所隐含的那样,这种调度器就是用简单按照“先到先得”的算法来调度任务的.例如,作业A和作业B被先后提交.那么在执行 ...

  9. Hadoop架构设计、执行原理具体解释

    1.Map-Reduce的逻辑过程 如果我们须要处理一批有关天气的数据.其格式例如以下: 依照ASCII码存储.每行一条记录 每一行字符从0開始计数,第15个到第18个字符为年 第25个到第29个字符 ...

随机推荐

  1. 在windows xp 平台上安装mvc4失败

    使用web 平台安装程序,在windows xp上安装mvc4 出现失败,需要主要是windows powershell 2.0安装失败,需要先卸载power shell 1.0或者 winowrm ...

  2. iTween visual Editor 0&period;6&period;1

    首先添加ITween Path编辑路径(无需路径运动的动画可忽略该步骤): 然后为需要添加动画的物体添加ITween Event脚本: 若是物体沿特定路径运动,则选中Path,并选择一个路径:  若想 ...

  3. 关于Java中equal 和 &equals;&equals; 的区别

    在对Java开发还不熟练的时候,往往很多人都喜欢用==去比较两个对象是否相等,有时候就会出现很奇葩的问题. 其实这类问题并不是奇葩问题,只是我们不够细心而已,在Java中“==”比较两个变量本身的值, ...

  4. 移除元素-leetcode-27

    class Solution {public:    int removeElement(vector<int>& nums, int val) {        if(nums. ...

  5. Qt库版查询

    1 背景 在为嵌入式产品开发Qt应用时,开发所使用的Qt库要和嵌入式系统所支持的Qt库版本一致,否则开发的App无法正确运行.那么,如何查询一个嵌入式系统中所安装Qt库的版本呢?下面将进行一些总结. ...

  6. 普通用户开放 sudo 权限

    大家都知道 linux 每个目录都是有权限的,所以如果要想在此目录下读写,则要有这个目录的权限,或者就是有 sudo 权限,那怎么给普通用户赋予 sudo 权限呢,下面我们来看一下: 1.先用 roo ...

  7. Xcode修改新建项目注释模板(作者和公司名等)

    我们新建项目后,每个页面头部都有一段注释说明, 如下: 如果我们想修改Created by XXX 和 Copyright 版权内容,该如何做呢? 1.对于修改作者:Created by xxx 这里 ...

  8. Codeforces 912 E&period;Prime Gift &lpar;折半枚举、二分&rpar;

    题目链接:Prime Gift 题意: 给出了n(1<=n<=16)个互不相同的质数pi(2<=pi<=100),现在要求第k大个约数全在所给质数集的数.(保证这个数不超过1e ...

  9. GitHub笔记(四)——标签管理

    五 标签管理 1 打标签.默认master $ git tag v1.0 要对add merge这次提交打标签,它对应的commit id是f52c633,敲入命令: $ git tag v0.9 f ...

  10. HDU 1754 I Hate It &lpar;线段树)

    题目链接 Problem Description 很多学校流行一种比较的习惯.老师们很喜欢询问,从某某到某某当中,分数最高的是多少. 这让很多学生很反感. 不管你喜不喜欢,现在需要你做的是,就是按照老 ...