mapreduce源码分析作业提交、初始化、分配、计算过程之初始化篇

时间:2022-05-25 21:54:06

作业初始化 

mapreduce源码分析作业提交、初始化、分配、计算过程之提交篇最后讲到Client远程RPC调用Jobtracker的submitJob方法,mapreduce作业初始化就从此处作为切入口。

 

jobtracker 里的submitJob方法:

public synchronized JobStatus submitJob(JobID jobId) throws IOException {
    if(jobs.containsKey(jobId)) {
      //job already running, don't start twice
      return jobs.get(jobId).getStatus();
    }
    
    JobInProgress job = new JobInProgress(jobId, this, this.conf); //创建JobInprogress对象维护作业的运行时信息
    
    String queue = job.getProfile().getQueueName();
    if(!(queueManager.getQueues().contains(queue))) {      
      new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
      throw new IOException("Queue \"" + queue + "\" does not exist");        
    }

    // check for access                                                           //检查用户是否有指定队列作业提交权限
    try {
      checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
    } catch (IOException ioe) {
       LOG.warn("Access denied for user " + job.getJobConf().getUser() 
                + ". Ignoring job " + jobId, ioe);
      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
      throw ioe;
    }

    // Check the job if it cannot run in the cluster because of invalid memory
    // requirements.
    try {
      checkMemoryRequirements(job);                              //检查作业配置的内存是否配置合理,用户提交作业时可用//mapred.job.map.memory.mb mapred.job.reduce.memory.mb指定map,reduce占用的内存量,管理员可能过参数mapred.cluster.max.map.memory.mb, mapred.cluster.max.reduce.memory.mb配置用户最大内存使用量,一旦超过,则作业提交失

    } catch (IOException ioe) {
      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
      throw ioe;
    }

   return addJob(jobId, job);   //通知taskscheduler将作业加入作业列队,同时初始化作业  }

jobtrackerlisteners列表执行add方法,触发到eagerTaskInitializationListener类去执行init(wait notify方法)-调用jobtrackerinitJob-再调用JobInprogressinitTasks方法-JobProgress内会创建所有的map,reduce对应的taskprogress放在缓存里nonRunningMapCache....

 

addJob方法内:

  private synchronized JobStatus addJob(JobID jobId, JobInProgress job) {
    totalSubmissions++;

    synchronized (jobs) {
      synchronized (taskScheduler) {
        jobs.put(job.getProfile().getJobID(), job);

 

/*

观察者设计模式

jobInProgressListeners为监听对象,存储为一系列的待通知对象 ,private final List<JobInProgressListener> jobInProgressListeners = new CopyOnWriteArrayList<JobInProgressListener>();

jobtracker内有jobInProgressListeners初始化方法addJobInProgressListener

*/


        for (JobInProgressListener listener : jobInProgressListeners) { 
          try {
            listener.jobAdded(job);                         //调用listener就会调用taskscheduler(JobQueueTaskScheduler)的相关方法,所谓观察者模式
          } catch (IOException ioe) {
            LOG.warn("Failed to add and so skipping the job : "
                + job.getJobID() + ". Exception : " + ioe);
          }
        }
      }
    }
    myInstrumentation.submitJob(job.getJobConf(), jobId);
    return job.getStatus();
  }

 

 listener.jobAdded即是调用JobQueueJobInProgressListenerjobAdded方法,那什么时候把JobQueueJobInProgressListener对象赋给jobtracker的呢?是在taskscheduler启动时候,那taskscheduler什么时候启动呢,那是在jobtracker start初始化的时候会根据mapred.jobtracker.taskScheduler构造相应的调度器

 

JobQueueTaskScheduler调度器的启动

jobQueueJobInProgressListener第一个增加到jobtracker里的监听列表里,所以作业增加在作业初始化前面

  public synchronized void start() throws IOException {
    super.start();
    taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);

//taskTrackerManager实际是jobtracker的对象,向jobtracker注册jobQueueJobInProgressListener对象,所以上面的listener.jobAdded(job);调用的方法,调的是jobQueueJobInProgressListener的方法
    eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
    eagerTaskInitializationListener.start();
    taskTrackerManager.addJobInProgressListener(
        eagerTaskInitializationListener);  //eagerTaskInitializationListener会初始化作业,调用jobtracker中的initJob方法,initJob方法会调用JobInprogressinitTask方法
  }

jobQueueJobInProgressListener的方法
public void jobAdded(JobInProgress job) {
    jobQueue.put(new JobSchedulingInfo(job.getStatus()), job);
  }


上边的调度器是jobtracker启动的时候构建的  
  public static JobTracker startTracker(JobConf conf, String identifier) 
  throws IOException, InterruptedException {
    JobTracker result = null;
    while (true) {
      try {
        result = new JobTracker(conf, identifier); //jobtracker构造方法内会根据参数mapred.jobtracker.taskScheduler创建相应的调度器
        result.taskScheduler.setTaskTrackerManager(result); //jobtracker实例传给taskScheduler

 

jobtrackermain方法会调用startTracker

 

  public static void main(String argv[]
                          ) throws IOException, InterruptedException {
....    try {
      JobTracker tracker = startTracker(new JobConf());
      tracker.offerService(); //启动各个线程,interTrackerServeripc.Server类的对象(有内部类Listener,以nio 的方式启动守护进程,动  infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, 
        tmpInfoPort == 0, conf);  HttpServer( org.apache.hadoop.http) 

HttpServer的作用

* Create a Jetty embedded server to answer http requests. The primary goal
 * is to serve up status information for the server

* There are three contexts:
 *   "/logs/" -> points to the log directory
 *   "/static/" -> points to common static files (src/webapps/static)
 *   "/" -> the jsp server code from (src/webapps/<name>)

 

 

作业的初始化

调度器:

Listener类都会加到jobtracker类,jobtrackersubmitjob会执行Listeneraddjob方法

eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
    eagerTaskInitializationListener.start();
    taskTrackerManager.addJobInProgressListener(
        eagerTaskInitializationListener);  //eagerTaskInitializationListener会初始化作业,调用jobtracker中的initJob方法,initJob方法会调用JobInprogressinitTask方法

细节jobAdded会触发initJob

 

EagerTaskInitializationListener类:

....

class JobInitManager implements Runnable {

   

    public void run() {

      JobInProgress job = null;

      while (true) {

        try {

          synchronized (jobInitQueue) {

            while (jobInitQueue.isEmpty()) {

              jobInitQueue.wait();

            }

            job = jobInitQueue.remove(0);

          }

          threadPool.execute(new InitJob(job));

        } catch (InterruptedException t) {

          LOG.info("JobInitManagerThread interrupted.");

          break;

        } 

      }

....

 class InitJob implements Runnable {

  

    private JobInProgress job;

    

    public InitJob(JobInProgress job) {

      this.job = job;

    }

    

    public void run() {

      ttm.initJob(job); //此处的ttm即是jobtracker

    }

  }

....

 public void jobAdded(JobInProgress job) {

    synchronized (jobInitQueue) {

      jobInitQueue.add(job);

      resortInitQueue();

      jobInitQueue.notifyAll();

    }

  }

JobInprogressinitTask方法创建一系列taskinprogress缓存起来,等待任务分配发回到tasktracker

/**
* Construct the splits, etc. This is invoked from an async
* thread so that split-computation doesn't block anyone.
*/
public synchronized void initTasks()
throws IOException, KillInterruptedException {
if (tasksInited.get() || isComplete()) {
return;
}
synchronized(jobInitKillStatus){
if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
return;
}
jobInitKillStatus.initStarted = true;
}

LOG.info("Initializing " + jobId);

// log job info
JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(),
this.startTime, hasRestarted());
// log the job priority
setPriority(this.priority);

//
// read input splits and create a map per a split
//
String jobFile = profile.getJobFile();

Path sysDir = new Path(this.jobtracker.getSystemDir());
FileSystem fs = sysDir.getFileSystem(conf);
DataInputStream splitFile =
fs.open(new Path(conf.get("mapred.job.split.file")));
JobClient.RawSplit[] splits;
try {
splits = JobClient.readSplitFile(splitFile);
} finally {
splitFile.close();
}
numMapTasks = splits.length;


// if the number of splits is larger than a configured value
// then fail the job.
int maxTasks = jobtracker.getMaxTasksPerJob();
if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
throw new IOException(
"The number of tasks for this job " +
(numMapTasks + numReduceTasks) +
" exceeds the configured limit " + maxTasks);
}
jobtracker.getInstrumentation().addWaiting(
getJobID(), numMapTasks + numReduceTasks);

maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i);
}
LOG.info("Input size for job " + jobId + " = " + inputLength
+ ". Number of splits = " + splits.length);
if (numMapTasks > 0) {
nonRunningMapCache = createCache(splits, maxLevel);
}

// set the launch time
this.launchTime = System.currentTimeMillis();

//
// Create reduce tasks
//
this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
reduces[i] = new TaskInProgress(jobId, jobFile,
numMapTasks, i,
jobtracker, conf, this);
nonRunningReduces.add(reduces[i]);
}

// Calculate the minimum number of maps to be complete before
// we should start scheduling reduces
completedMapsForReduceSlowstart =
(int)Math.ceil(
(conf.getFloat("mapred.reduce.slowstart.completed.maps",
DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
numMapTasks));

// create cleanup two cleanup tips, one map and one reduce.
cleanup = new TaskInProgress[2];

// cleanup map tip. This map doesn't use any splits. Just assign an empty
// split.
JobClient.RawSplit emptySplit = new JobClient.RawSplit();
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks);
cleanup[0].setJobCleanupTask();

// cleanup reduce tip.
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this);
cleanup[1].setJobCleanupTask();

// create two setup tips, one map and one reduce.
setup = new TaskInProgress[2];

// setup map tip. This map doesn't use any split. Just assign an empty
// split.
setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks + 1 );
setup[0].setJobSetupTask();

// setup reduce tip.
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this);
setup[1].setJobSetupTask();

synchronized(jobInitKillStatus){
jobInitKillStatus.initDone = true;
if(jobInitKillStatus.killed) {
throw new KillInterruptedException("Job " + jobId + " killed in init");
}
}

tasksInited.set(true);
JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
numMapTasks, numReduceTasks);
}