Hadoop-MapReduce-MRAppMaster启动篇

时间:2024-01-27 11:34:53

 一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、上下文

在上一篇<Hadoop-MapReduce-源码跟读-客户端篇>中已经将到:作业提交到ResourceManager,那么对于该Job第一个容器(MRAppMaster)是怎么启动的呢?接下来我们一起来看看

三、结论

MRJobConfig是一个MRJob的配置,里面包含了Map、Reduce、Combine类以及Job名称、用户名称、队列名称、MapTask数量、ReduceTask数量、工作目录,jar在本地的路径、任务超时时间、任务id、输入输出目录,每个任务的内存大小和cpu核数等等。

此外它里面还有一个属性,如下:

package org.apache.hadoop.mapreduce;
public interface MRJobConfig {
    //......省略......

    public static final String APPLICATION_MASTER_CLASS =
          "org.apache.hadoop.mapreduce.v2.app.MRAppMaster";

    public static final String MAPREDUCE_V2_CHILD_CLASS = 
          "org.apache.hadoop.mapred.YarnChild";
    //......省略......
}

MRAppMaster是MapReduce的ApplicationMaster实现,负责整个MapReduce作业的过程调度和状态协调

YarnChid是运行在每个容器中的进程,负责运行某一个MapTask或者ReduceTask,

有兴趣的同学可以看一个任务的Yarn日志,也可以看我的<Hadoop-MapReduce-跟着日志理解整体流程>一篇中的日志,就可以发现ApplicationMaster容器和MapTask、ReduceTask所在容器的的日志开头分别就是MRAppMaster和YarnChid

MRAppMaster的启动参数是在YARNRunner中配置的:

public class YARNRunner implements ClientProtocol {
    private List<String> setupAMCommand(Configuration jobConf) {
    List<String> vargs = new ArrayList<>(8);
    vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
        + "/bin/java");

    //......省略......

    vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);

    //......省略......

    return vargs;
  }
}

YarnChid的启动参数是在MapReduceChildJVM中配置的:

public class MapReduceChildJVM {
  public static List<String> getVMCommand(
      InetSocketAddress taskAttemptListenerAddr, Task task, 
      JVMId jvmID) {

    TaskAttemptID attemptID = task.getTaskID();
    JobConf conf = task.conf;

    Vector<String> vargs = new Vector<String>(8);

    vargs.add(MRApps.crossPlatformifyMREnv(task.conf, Environment.JAVA_HOME)
        + "/bin/java");

    //......省略......

    vargs.add(YarnChild.class.getName());  // main of Child
    
    //......省略......

    return vargsFinal;
  }
}

YarnChid启动后会启动MapTask或者ReduceTask

四、调用细节(源码跟读)

我们接着上一篇<Hadoop-MapReduce-源码跟读-客户端篇>的源码开始分析,即:YARNRunner.submitJob()中的ApplicationSubmissionContext构建

1、YARNRunner

1.1、createApplicationSubmissionContext

//构建启动MR AM所需的所有信息
public ApplicationSubmissionContext createApplicationSubmissionContext(
      Configuration jobConf, String jobSubmitDir, Credentials ts)
      throws IOException {
    //获取applicationId (resMgrDelegate 是 YarnClient 的子类)
    //applicationId是应用程序的全局唯一标识符,通过使用集群时间戳(即ResourceManager的开始时间)以及应用程序的单调递增计数器来实现的。
    ApplicationId applicationId = resMgrDelegate.getApplicationId();

    //设置本地资源
    //LocalResource表示运行容器所需的本地资源
    //NodeManager负责在启动容器之前本地化资源
    //应用程序可以指定LocalResourceType和LocalResourceVisibility
    //LocalResourceType:
    //    1、FILE    : 常规文件,即不间断的字节
    //    2、ARCHIVE : 归档,由NodeManager自动取消归档
    //    3、PATTERN : 1和2的混合
    //LocalResourceVisibility:
    //    1、PUBLIC  :     由节点上的所有用户共享
    //    2、PRIVATE :     在节点上同一用户的所有应用程序之间共享
    //    3、APPLICATION : 仅在节点上的同一应用程序的容器之间共享。
    //该方法会设置job配置文件、job jar包的HDFS路径等,最后得到这样一个Map
    //    <"job.xml" , LocalResource>
    //    <"job.jar" , LocalResource>
    //    <"jobSubmitDir/job.split" , LocalResource>
    //    <"jobSubmitDir/job.splitmetainfo , LocalResource>
    Map<String, LocalResource> localResources =
        setupLocalResources(jobConf, jobSubmitDir);

    //设置安全令牌
    DataOutputBuffer dob = new DataOutputBuffer();
    ts.writeTokenStorageToStream(dob);
    ByteBuffer securityTokens =
        ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    //为AM容器设置ContainerLaunchContext
    //ContainerLaunchContext表示NodeManager启动容器所需的所有信息,包括:
    //    1、ContainerId
    //    2、分配给容器的资源
    //    3、容器分配给的用户
    //    4、如果启用了安全性,还需要安全令牌
    //    5、我们上面设置的本地资源
    //    6、可选的、特定于应用程序的二进制服务数据
    //    7、已启动进程的环境变量
    //    8、启动容器的命令(里面包含了AM和Task所在容器的启动类,即结论中的MRAppMaster和YarnChild)
    //    9、容器失败退出时的重试策略
    //***********************************************
    //setupAMCommand方法会设置AM所在容器的启动命令参数,下面我们会展开看看命令是什么样的
    List<String> vargs = setupAMCommand(jobConf);
    ContainerLaunchContext amContainer = setupContainerLaunchContextForAM(
        jobConf, localResources, securityTokens, vargs);

    //设置RM用于续订令牌的配置
    String regex = conf.get(MRJobConfig.MR_JOB_SEND_TOKEN_CONF);
    if (regex != null && !regex.isEmpty()) {
      setTokenRenewerConf(amContainer, conf, regex);
    }


    Collection<String> tagsFromConf =
        jobConf.getTrimmedStringCollection(MRJobConfig.JOB_TAGS);

    //设置ApplicationSubmissionContext
    //ApplicationSubmissionContext表示ResourceManager为应用程序启动ApplicationMaster所需的所有信息,包括:
    //    1、ApplicationId
    //    2、Application用户
    //    3、Application名称
    //    4、Application属性
    //    5、执行ApplicationMaster的容器的ContainerLaunchContext,上面我们已经构建了
    //    6、maxAppAttempts。应用程序尝试的最大次数。它应该不大于YARN配置中最大尝试的全局次数。
    //    7、尝试失败有效性间隔。默认值为-1。当以毫秒为单位的attemptFailuresValidationInterval设置为>0时,故障数将不会将发生在validityInterval之外的故障计入故障计数。如果失败计数达到maxAppAttempts,则应用程序将失败。
    //    8、可选,特定于应用程序的LogAggregationContext(LogAggregationContext表示NodeManager处理应用程序日志所需的所有信息。)
    ApplicationSubmissionContext appContext =
        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
    appContext.setApplicationId(applicationId);                // ApplicationId
    appContext.setQueue(                                       // Queue name
        jobConf.get(JobContext.QUEUE_NAME,
        YarnConfiguration.DEFAULT_QUEUE_NAME));
    // add reservationID if present
    ReservationId reservationID = null;
    try {
      reservationID =
          ReservationId.parseReservationId(jobConf
              .get(JobContext.RESERVATION_ID));
    } catch (NumberFormatException e) {
      // throw exception as reservationid as is invalid
      String errMsg =
          "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID)
              + " specified for the app: " + applicationId;
      LOG.warn(errMsg);
      throw new IOException(errMsg);
    }
    if (reservationID != null) {
      appContext.setReservationID(reservationID);
      LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId
          + " to queue:" + appContext.getQueue() + " with reservationId:"
          + appContext.getReservationID());
    }
    appContext.setApplicationName(                             // Job name
        jobConf.get(JobContext.JOB_NAME,
        YarnConfiguration.DEFAULT_APPLICATION_NAME));
    appContext.setCancelTokensWhenComplete(
        conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
    appContext.setAMContainerSpec(amContainer);         // AM Container
    appContext.setMaxAppAttempts(
        conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
            MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));

    // Setup the AM ResourceRequests
    List<ResourceRequest> amResourceRequests = generateResourceRequests();
    appContext.setAMContainerResourceRequests(amResourceRequests);

    // set labels for the AM container requests if present
    String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP);
    if (null != amNodelabelExpression
        && amNodelabelExpression.trim().length() != 0) {
      for (ResourceRequest amResourceRequest : amResourceRequests) {
        amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
      }
    }
    // set labels for the Job containers
    appContext.setNodeLabelExpression(jobConf
        .get(JobContext.JOB_NODE_LABEL_EXP));

    appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
    if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
      appContext.setApplicationTags(new HashSet<>(tagsFromConf));
    }

    String jobPriority = jobConf.get(MRJobConfig.PRIORITY);
    if (jobPriority != null) {
      int iPriority;
      try {
        iPriority = TypeConverter.toYarnApplicationPriority(jobPriority);
      } catch (IllegalArgumentException e) {
        iPriority = Integer.parseInt(jobPriority);
      }
      appContext.setPriority(Priority.newInstance(iPriority));
    }

    return appContext;
  }

1.2、setupAMCommand

private List<String> setupAMCommand(Configuration jobConf) {
    //命令参数长度为固定的8个
    List<String> vargs = new ArrayList<>(8);
    //$JAVA_HOME/bin/java 即java在本地的路径
    vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
        + "/bin/java");

    Path amTmpDir =
        new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
            YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
    //-Djava.io.tmpdir=容器的临时目录
    vargs.add("-Djava.io.tmpdir=" + amTmpDir);
    MRApps.addLog4jSystemProperties(null, vargs, conf);

    //检查MAP和REDUCE配置中的Java Lib路径使用情况
    warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS, ""),
        "map",
        MRJobConfig.MAP_JAVA_OPTS,
        MRJobConfig.MAP_ENV);
    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, ""),
        "map",
        MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
        MRJobConfig.MAPRED_ADMIN_USER_ENV);
    warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS, ""),
        "reduce",
        MRJobConfig.REDUCE_JAVA_OPTS,
        MRJobConfig.REDUCE_ENV);
    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, ""),
        "reduce",
        MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
        MRJobConfig.MAPRED_ADMIN_USER_ENV);

    //在用户命令选择之前添加AM管理命令选项,以便用户可以覆盖它
    String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
    warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
        MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
    //默认是-Xmx1024m ,用户可以通过yarn.app.mapreduce.am.admin-command-opts设置
    vargs.add(mrAppMasterAdminOptions);

    //添加AM用户命令选项
    String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
    warnForJavaLibPath(mrAppMasterUserOptions, "app master",
        MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
    默认是-Xmx1024m ,用户可以通过yarn.app.mapreduce.am.command-opts设置
    vargs.add(mrAppMasterUserOptions);

    //默认false,可以通过yarn.app.mapreduce.am.profile设置
    if (jobConf.getBoolean(MRJobConfig.MR_AM_PROFILE,
        MRJobConfig.DEFAULT_MR_AM_PROFILE)) {
      final String profileParams = jobConf.get(MRJobConfig.MR_AM_PROFILE_PARAMS,
          MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
      if (profileParams != null) {
        //默认是-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=<LOG_DIR>/profile.out
        vargs.add(String.format(profileParams,
            ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR
                + TaskLog.LogName.PROFILE));
      }
    }

    //这里就是设置的启动类org.apache.hadoop.mapreduce.v2.app.MRAppMaster
    vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
    //1><LOG_DIR>/stdout
    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
        Path.SEPARATOR + ApplicationConstants.STDOUT);
    //2><LOG_DIR>/stderr
    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
        Path.SEPARATOR + ApplicationConstants.STDERR);
    return vargs;
  }

最终命令参数如下:

1、$JAVA_HOME/bin/java 
2、org.apache.hadoop.mapreduce.v2.app.MRAppMaster 
3、-Djava.io.tmpdir=容器的临时目录 
4、-Xmx1024m 
5、-Xmx1024m 
-6、agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=<LOG_DIR>/profile.out 
7、1><LOG_DIR>/stdout 
8、2><LOG_DIR>/stderr

1.3、submitJob

上面已经构建好了ApplicationSubmissionContext,下面可以提交了

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
   
    //......省略......

    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    //向ResourceManager提交ApplicationSubmissionContext 
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);

    //......省略......

  }

2、YarnClient

resMgrDelegate是YarnClient的子类,最终也是通过调用YarnClient.submitApplication()提交到Yarn

该方法注释如下:

向YARN提交新申请,这是一个阻塞调用-在提交的应用程序成功提交并被ResourceManager接受之前,它不会返回ApplicationId。

用户在提交新应用程序时应提供ApplicationId作为参数ApplicationSubmissionContext的一部分,否则将引发ApplicationIdNotProvideredException。

这在内部调用ApplicationClientProtocol.submitApplication(SubmitApplicationRequest),之后,它在内部调用ApplicationClientProtocol.getApplicationReport(GetApplicationReportRequest)并等待,直到它可以确保应用程序正确提交为止。如果在ResourceManager保存应用程序的状态之前RM发生故障转移或RM重新启动,ApplicationClientProtocol.getApplicationReport(GetApplicationReportRequest)将抛出ApplicationNotFoundException。当此API捕获到ApplicationNotFoundException时,它会自动重新提交具有相同ApplicationSubmissionContext的应用程序。

YarnClient.submitApplication() 由子类实现,既:YarnClientImpl.submitApplication()

YarnClientImpl

public ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    ApplicationId applicationId = appContext.getApplicationId();
    if (applicationId == null) {
      throw new ApplicationIdNotProvidedException(
          "ApplicationId is not provided in ApplicationSubmissionContext");
    }
    //构建SubmitApplicationRequest
    //客户端发送的请求向ResourceManager提交应用程序
    //该请求通过ApplicationSubmissionContext包含队列、运行ApplicationMaster所需的资源等详细信息,相当于启动ApplicationMaster的ContainerLaunchContext等
    SubmitApplicationRequest request =
        Records.newRecord(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(appContext);

    //自动将时间线DT添加到CLC中仅当安全和时间线服务都启用时
    if (isSecurityEnabled() && timelineV1ServiceEnabled) {
      addTimelineDelegationToken(appContext.getAMContainerSpec());
    }

    //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
    //在submitApplication调用期间处理RM故障切换。
    //这里会调用ApplicationClientProtocol.submitApplication()
    rmClient.submitApplication(request);

    int pollCount = 0;
    long startTime = System.currentTimeMillis();
    EnumSet<YarnApplicationState> waitingStates = 
                                 EnumSet.of(YarnApplicationState.NEW,
                                 YarnApplicationState.NEW_SAVING,
                                 YarnApplicationState.SUBMITTED);
    EnumSet<YarnApplicationState> failToSubmitStates = 
                                  EnumSet.of(YarnApplicationState.FAILED,
                                  YarnApplicationState.KILLED);		
    while (true) {
      try {
        ApplicationReport appReport = getApplicationReport(applicationId);
        YarnApplicationState state = appReport.getYarnApplicationState();
        //一直监控应用状态,直到状态不再是NEW、NEW_SAVING、SUBMITTED中的一个
        if (!waitingStates.contains(state)) {
          if(failToSubmitStates.contains(state)) {
            throw new YarnException("Failed to submit " + applicationId + 
                " to YARN : " + appReport.getDiagnostics());
          }
          LOG.info("Submitted application " + applicationId);
          break;
        }

        //......省略......
    }

    return applicationId;
  }

3、ApplicationClientProtocol

ApplicationClientProtocol是客户端和ResourceManager之间的协议,用于提交/中止作业以及获取有关应用程序、集群指标、节点、队列和ACL的信息。

submitApplication()注释如下:

客户端用于向ResourceManager提交新应用程序的接口

客户端需要通过SubmitApplicationRequest提供运行ApplicationMaster所需的队列、资源等详细信息,相当于ContainerLaunchContext,用于启动ApplicationMaster等

当前,ResourceManager在接受提交时立即(空)发送SubmitApplicationResponse,如果拒绝提交则抛出异常。但是,此调用之后需要执行getApplicationReport(GetApplicationReportRequest)以确保正确提交应用程序-从ResourceManager获得SubmitApplicationResponse并不保证RM在故障转移或重新启动之后“记住”此应用程序。如果在ResourceManager成功保存应用程序的状态之前发生RM故障切换或RM重新启动,则随后的getApplicationReport(GetApplicationReportReques)将抛出ApplicationNotFoundException。当客户端在(getApplicationReport(GetApplicationReportRequest)调用中遇到ApplicationNotFoundException时,客户端需要使用相同的ApplicationSubmissionContext重新提交应用程序。

在提交过程中,它会检查应用程序是否已经存在。如果应用程序存在,它将简单地返回SubmitApplicationResponse

在安全模式下,ResourceManager在接受应用程序提交之前验证对队列等的访问权限。

该方法最终会调用其子类ClientRMService.submitApplication()

4、ClientRMService

该类是资源管理器的客户端接口。该模块处理从客户端到资源管理器的所有rpc接口。

public SubmitApplicationResponse submitApplication(
      SubmitApplicationRequest request) throws YarnException, IOException {
    
    //......省略......
    
    //检查应用程序是否已经放入rmContext,如果是,只需返回响应
    if (rmContext.getRMApps().get(applicationId) != null) {
      LOG.info("This is an earlier submitted application: " + applicationId);
      return SubmitApplicationResponse.newInstance();
    }

    //......省略......

      //调用RMAppManager直接提交申请,我们接着看rmAppManager中的实现
      rmAppManager.submitApplication(submissionContext,
          System.currentTimeMillis(), userUgi);

    //......省略......

    return recordFactory
        .newRecordInstance(SubmitApplicationResponse.class);
  }

5、RMAppManager

该类管理资源管理器的应用程序列表。

protected void submitApplication(
      ApplicationSubmissionContext submissionContext, long submitTime,
      UserGroupInformation userUgi) throws YarnException {
    ApplicationId applicationId = submissionContext.getApplicationId();

    //将开始时间传递为-1。它最终将在RMAppImpl构造函数中设置
    RMAppImpl application = createAndPopulateNewRMApp(
        submissionContext, submitTime, userUgi, false, -1, null);
    try {
      //确定UserGroupInformation是使用Kerberos来确定用户身份,还是依赖于简单身份验证
      if (UserGroupInformation.isSecurityEnabled()) {
        this.rmContext.getDelegationTokenRenewer()
            .addApplicationAsync(applicationId,
                BuilderUtils.parseCredentials(submissionContext),
                submissionContext.getCancelTokensWhenComplete(),
                application.getUser(),
                BuilderUtils.parseTokensConf(submissionContext));
      } else {
        //调度程序此时尚未启动,因此应确保在调度程序启动时首先处理这些排队的START事件。
        //下面我们看下这个事件(RMAppEventType.START)是怎么处理的
        this.rmContext.getDispatcher().getEventHandler()
            .handle(new RMAppEvent(applicationId, RMAppEventType.START));
      }
    } catch (Exception e) {
      LOG.warn("Unable to parse credentials for " + applicationId, e);
      // 发送APP_REJECTED是可以的,因为我们假设RMApp处于NEW状态,因此我们还没有通知调度器应用程序的存在
      this.rmContext.getDispatcher().getEventHandler()
          .handle(new RMAppEvent(applicationId,
              RMAppEventType.APP_REJECTED, e.getMessage()));
      throw RPCUtil.getRemoteException(e);
    }
  }

RMAppEventType.START 是由RMAppImpl处理

6、RMAppImpl

private static final StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent> stateMachineFactory
                               = new StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent>(RMAppState.NEW)


     // Transitions from NEW state
    
    //......以上事件省略......

    //我们重点看这个事件的处理,既:RMAppNewlySavingTransition
    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())
    

    //......以下事件省略......

    // Transitions from NEW_SAVING state
    
    //第9步会用到
    .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
    

     // Transitions from SUBMITTED state
    
    //第10步会用到
    .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
        RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())

     // Transitions from ACCEPTED state
    

     // Transitions from RUNNING state
    

     // Transitions from FINAL_SAVING state
    

     // Transitions from FINISHING state
    

     // Transitions from KILLING state
    

     // Transitions from FINISHED state
     // ignorable transitions
   

     // Transitions from FAILED state
     // ignorable transitions
    

     // Transitions from KILLED state
     // ignorable transitions
    

     .installTopology();

7、RMAppNewlySavingTransition

private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      long applicationLifetime =
          app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
      applicationLifetime = app.scheduler
          .checkAndGetApplicationLifetime(app.queue, applicationLifetime);
      //根据配置的队列生存期,验证提交的应用程序生存期是否有效。
      if (applicationLifetime > 0) {
        // calculate next timeout value
        Long newTimeout =
            Long.valueOf(app.submitTime + (applicationLifetime * 1000));
        app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
            ApplicationTimeoutType.LIFETIME, newTimeout);

        //使用新的绝对值更新applicationTimeouts
        app.applicationTimeouts.put(ApplicationTimeoutType.LIFETIME,
            newTimeout);

        LOG.info("Application " + app.applicationId
            + " is registered for timeout monitor, type="
            + ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime
            + " seconds");
      }

      //如果启用了恢复,则将应用程序信息存储在非阻塞调用中,以确保RM已存储了在RM重新启动后重新启动AM所需的信息,而无需进一步的客户端通信
      //这是一个新任务,我们看条线
      //非阻塞API资源管理器服务使用此来存储应用程序的状态此不阻塞调度程序线程RMAppStoredEvent将在完成时发送以通知RMApp
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }

8、RMStateStore

public void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationStateData appState =
        ApplicationStateData.newInstance(app.getSubmitTime(),
            app.getStartTime(), context, app.getUser(), app.getRealUser(),
            app.getCallerContext());
    appState.setApplicationTimeouts(app.getApplicationTimeouts());
    //这里会设置一个状态:RMStateStoreEventType.STORE_APP,接下来我们看看这个事件怎么处理
    getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState));
  }
private static final StateMachineFactory<RMStateStore,
                                           RMStateStoreState,
                                           RMStateStoreEventType, 
                                           RMStateStoreEvent>
      stateMachineFactory = new StateMachineFactory<RMStateStore,
                                                    RMStateStoreState,
                                                    RMStateStoreEventType,
                                                    RMStateStoreEvent>(
      RMStateStoreState.ACTIVE)
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.STORE_APP, new StoreAppTransition())
      
      //......省略......

    );
private static class StoreAppTransition
      implements MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {

      //......省略......
        store.storeApplicationStateInternal(appId, appState);
        //接下来我们看看这个事件的处理RMAppEventType.APP_NEW_SAVED)
        store.notifyApplication(
            new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED));
      //......省略......

      return finalState(isFenced);
    };

  }

9、再回RMAppImpl

第6步已经有关相关代码,我们就不贴了,处理RMAppEventType.APP_NEW_SAVED事件调用的是new AddApplicationToSchedulerTransition()

private static final class AddApplicationToSchedulerTransition extends
      RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      //里面new了一个新事件(SchedulerEventType.APP_ADDED)
      app.handler.handle(
          new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
              app.applicationPriority, app.placementContext));
      // 发送ATS创建事件
      app.sendATSCreateEvent();
    }
  }

SchedulerEventType.APP_ADDED事件会被调度器处理(FifoScheduler、CapacityScheduler、FairScheduler)

我们先看FairScheduler中的实现

10、FairScheduler

public void handle(SchedulerEvent event) {
    switch (event.getType()) {
    case NODE_ADDED:
      
    case NODE_REMOVED:
      
    case NODE_UPDATE:
      
    //......省略以上......

    case APP_ADDED:
      if (!(event instanceof AppAddedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
      String queueName =
          resolveReservationQueueName(appAddedEvent.getQueue(),
              appAddedEvent.getApplicationId(),
              appAddedEvent.getReservationID(),
              appAddedEvent.getIsAppRecovering());
      if (queueName != null) {
        //向调度程序中添加一个新的应用程序,该应用程序具有给定的id、队列名称和用户。
        //即使用户或队列超过配置的限制,这也会接受新的应用程序,但该应用程序不会被标记为可运行。会生成一个RMAppEventType.APP_ACCEPTED事件,接下来我们看看这个事件的处理
        addApplication(appAddedEvent.getApplicationId(),
            queueName, appAddedEvent.getUser(),
            appAddedEvent.getIsAppRecovering());
      }
      break;

    //......省略以下......

    case APP_REMOVED:
      
    case NODE_RESOURCE_UPDATE:
      
    case APP_ATTEMPT_ADDED:
    //第12步会用到
    if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
          (AppAttemptAddedSchedulerEvent) event;
      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
        appAttemptAddedEvent.getIsAttemptRecovering());
      break;
      
    case APP_ATTEMPT_REMOVED:
      
    case RELEASE_CONTAINER:
      
    case CONTAINER_EXPIRED:
      
    default:
      LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
    }
  }

RMAppEventType.APP_ACCEPTE 的处理还是在RMAppImpl

11、再回RMAppImpl

第6步已经有关相关代码,我们就不贴了,处理RMAppEventType.APP_ACCEPTE 事件调用的是new StartAppAttemptTransition()

private static final class StartAppAttemptTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.createAndStartNewAttempt(false);
    };
  }

    |
    |
   \ /

private void
      createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
    //创建一个新的Attempt
    createNewAttempt();
    //这里会生成一个新事件RMAppAttemptEventType.START
    handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
      transferStateFromPreviousAttempt));
  }

RMAppAttemptEventType.START事件由RMAppAttemptImpl进行处理

12、RMAppAttemptImpl

private static final StateMachineFactory<RMAppAttemptImpl,
                                           RMAppAttemptState,
                                           RMAppAttemptEventType,
                                           RMAppAttemptEvent>
       stateMachineFactory  = new StateMachineFactory<RMAppAttemptImpl,
                                            RMAppAttemptState,
                                            RMAppAttemptEventType,
                                     RMAppAttemptEvent>(RMAppAttemptState.NEW)

       // Transitions from NEW State
      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
          RMAppAttemptEventType.START, new AttemptStartedTransition())
      
      //......省略以下......
          
      // Transitions from SUBMITTED state
      
          
       // Transitions from SCHEDULED State
       //第13步会用到
       .addTransition(RMAppAttemptState.SUBMITTED, 
          EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
                     RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.ATTEMPT_ADDED,
          new ScheduleTransition())
      
       // Transitions from SUBMITTED state
       //第16步会用到
       .addTransition(RMAppAttemptState.SUBMITTED, 
          EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
                     RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.ATTEMPT_ADDED,
          new ScheduleTransition())

       // Transitions from ALLOCATED_SAVING State
       //第16步会用到
       .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
          RMAppAttemptState.ALLOCATED,
          RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
      

       // Transitions from LAUNCHED_UNMANAGED_SAVING State
     

       // Transitions from ALLOCATED State
     

       // Transitions from LAUNCHED State
      

       // Transitions from RUNNING State
      

       // Transitions from FINAL_SAVING State
      

      // Transitions from FAILED State
      

      // Transitions from FINISHING State
      

      // Transitions from FINISHED State
      

      // Transitions from KILLED State
      
    .installTopology();

RMAppAttemptEventType.START事件由AttemptStartedTransition()处理

private static final class AttemptStartedTransition extends BaseTransition {
	@Override
    public void transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {
      
      //......省略......

      //将应用程序尝试添加到计划程序,并通知计划程序是否从上次尝试转移状态。
      //生成新的事件SchedulerEventType.APP_ATTEMPT_ADDED
      appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
        appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
    }
  }

13、再回FairScheduler

第10步FairScheduler中有对SchedulerEventType.APP_ATTEMPT_ADDED事件的处理,这里就不贴代码了

protected void addApplicationAttempt(
      ApplicationAttemptId applicationAttemptId,
      boolean transferStateFromPreviousAttempt,
      boolean isAttemptRecovering) {
    writeLock.lock();
    
        //生成一个新的事件RMAppAttemptEventType.ATTEMPT_ADDED
        rmContext.getDispatcher().getEventHandler().handle(
            new RMAppAttemptEvent(applicationAttemptId,
                RMAppAttemptEventType.ATTEMPT_ADDED));
        
  }

14、再回RMAppAttemptImpl

第12步已经有关相关代码,我们就不贴了,处理RMAppAttemptEventType.ATTEMPT_ADDED

事件调用的是new ScheduleTransition()

 public static final class ScheduleTransition
      implements
      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
    @Override
    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {

       //......省略......

        //提交时已检查AM资源
        //这里会通过YarnScheduler申请容器
        Allocation amContainerAllocation =
            appAttempt.scheduler.allocate(
                appAttempt.applicationAttemptId,
                appAttempt.amReqs, null, EMPTY_CONTAINER_RELEASE_LIST,
                amBlacklist.getBlacklistAdditions(),
                amBlacklist.getBlacklistRemovals(),
                new ContainerUpdates());
        if (amContainerAllocation != null
            && amContainerAllocation.getContainers() != null) {
          assert (amContainerAllocation.getContainers().size() == 0);
        }
        appAttempt.scheduledTime = System.currentTimeMillis();
        //现在状态是RMAppAttemptState.SCHEDULED
        return RMAppAttemptState.SCHEDULED;

      //......省略......

    }
  }

YarnScheduler.allocate(),最终会调用三种调度器中的方法进行实现,我们还是先只看FairScheduler

15、再回FairScheduler

public Allocation allocate(ApplicationAttemptId appAttemptId,
      List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
      List<ContainerId> release, List<String> blacklistAdditions,
      List<String> blacklistRemovals, ContainerUpdates updateRequests) {
    //请确保此应用程序存在
    FSAppAttempt application = getSchedulerApp(appAttemptId);
    if (application == null) {
      LOG.error("Calling allocate on removed or non existent application " +
          appAttemptId.getApplicationId());
      return EMPTY_ALLOCATION;
    }

    //分配可能是上一次尝试的剩余部分,它将影响当前尝试,例如混淆当前尝试的AM容器的请求和分配。
    //请注意,尝试id的外部先决条件检查在这里可能已经过时,因此有必要在这里进行双重检查。
    if (!application.getApplicationAttemptId().equals(appAttemptId)) {
      LOG.error("Calling allocate on previous or removed " +
          "or non existent application attempt " + appAttemptId);
      return EMPTY_ALLOCATION;
    }

    ApplicationId applicationId = application.getApplicationId();
    FSLeafQueue queue = application.getQueue();
    List<MaxResourceValidationResult> invalidAsks =
            validateResourceRequests(ask, queue);

    //如果检测到任何无效的请求,我们需要在这里快速失败。如果我们稍后抛出异常,这可能会有问题,        
    //因为令牌和升级/降级的容器会丢失,因为调度程序会立即清除它们,AM不会获得这些信息。
    if (!invalidAsks.isEmpty()) {
      throw new SchedulerInvalidResoureRequestException(String.format(
              "Resource request is invalid for application %s because queue %s "
                      + "has 0 amount of resource for a resource type! "
                      + "Validation result: %s",
              applicationId, queue.getName(), invalidAsks));
    }

    //处理晋升和降级
    handleContainerUpdates(application, updateRequests);

    //健全性检查
    normalizeResourceRequests(ask, queue.getName());

    // TODO, 正常计划请求

    //记录容器分配开始时间
    application.recordContainerRequestTime(getClock().getTime());

    //释放容器
    releaseContainers(release, application);

    ReentrantReadWriteLock.WriteLock lock = application.getWriteLock();
    lock.lock();
    try {
      if (!ask.isEmpty()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              "allocate: pre-update" + " applicationAttemptId=" + appAttemptId
                  + " application=" + application.getApplicationId());
        }
        application.showRequests();

        //更新应用程序请求
        application.updateResourceRequests(ask);

        // TODO, 处理SchedulingRequest
        application.showRequests();
      }
    } finally {
      lock.unlock();
    }

    Set<ContainerId> preemptionContainerIds =
        application.getPreemptionContainerIds();
    if (LOG.isDebugEnabled()) {
      LOG.debug(
          "allocate: post-update" + " applicationAttemptId=" + appAttemptId
              + " #ask=" + ask.size() + " reservation= " + application
              .getCurrentReservation());

      LOG.debug("Preempting " + preemptionContainerIds.size()
          + " container(s)");
    }

    application.updateBlacklist(blacklistAdditions, blacklistRemovals);

    List<Container> newlyAllocatedContainers =
        application.pullNewlyAllocatedContainers();
    //记录容器分配时间
    if (!(newlyAllocatedContainers.isEmpty())) {
      application.recordContainerAllocationTime(getClock().getTime());
    }

    //净空取决于集群中的资源、队列的当前使用情况、队列的公平共享和队列的最大资源。
    Resource headroom = application.getHeadroom();
    application.setApplicationHeadroomForMetrics(headroom);

    //当AM心跳时调用。AM注册后,RM回收了这些集装箱
    //它们在AllocateResponse.containersFromPreviousAttempts()中报告给AM
    List<Container> previousAttemptContainers = application
        .pullPreviousAttemptContainers();
    //NMToken用于验证与NodeManager的通信
    //ApplicationMaster与ResourceManager协商资源时,ResourceMananger发出,NodeManager端验证
    List<NMToken> updatedNMTokens = application.pullUpdatedNMTokens();
    //制作分配单元
    return new Allocation(newlyAllocatedContainers, headroom,
        preemptionContainerIds, null, null,
        updatedNMTokens, null, null,
        application.pullNewlyPromotedContainers(),
        application.pullNewlyDemotedContainers(),
        previousAttemptContainers);
  }

分配单元构建成功后,我们看下RMAppAttemptState.SCHEDULED事件的处理,

16、再回RMAppAttemptImpl

第12步已经有关相关代码,我们就不贴了,处理RMAppAttemptState.SCHEDULED

事件调用的是new AMContainerAllocatedTransition()

private static final class AMContainerAllocatedTransition
      implements
      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
    @Override
    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {
      //从调度程序获取AM容器。
      Allocation amContainerAllocation =
          appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
            EMPTY_CONTAINER_REQUEST_LIST, null, EMPTY_CONTAINER_RELEASE_LIST, null,
            null, new ContainerUpdates());
      // 必须至少分配一个容器,因为container_allocated是在构造RMContainer并将其放入SchedulerApplication.newlyAllocatedContainers()之后发出的。

      //请注意,YarnScheduler.allocate()不能保证能够提取它,因为容器可能由于某些原因而无法提取,例如DNS不可用导致无法生成容器令牌。因此,我们返回到以前的状态并保持重试,直到提取到容器为止
      if (amContainerAllocation.getContainers().size() == 0) {
        appAttempt.retryFetchingAMContainer(appAttempt);
        return RMAppAttemptState.SCHEDULED;
      }

      //设置主容器
      Container amContainer = amContainerAllocation.getContainers().get(0);
      RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
          .getRMContainer(amContainer.getId());
      //当删除一个NM时,调度器将清理容器,下面的container_FINISHED事件将处理已清理的容器。所以只需返回RMAppAttemptState。已排定
      if (rmMasterContainer == null) {
        return RMAppAttemptState.SCHEDULED;
      }
      //为尝试应用分配AM容器
      appAttempt.setMasterContainer(amContainer);
      rmMasterContainer.setAMContainer(true);
      //NMTokenSecrentManager中设置的节点用于标记此节点的NMToken是否已颁发给AM。
      //当AM容器分配给RM本身时,分配此AM容器的节点被标记为已发送的NMToken。
      //因此,清除此节点集,以便来自AM的以下分配请求能够检索相应的NMToken。
      appAttempt.rmContext.getNMTokenSecretManager()
        .clearNodeSetForAttempt(appAttempt.applicationAttemptId);
      appAttempt.getSubmissionContext().setResource(
        appAttempt.getMasterContainer().getResource());
      appAttempt.containerAllocatedTime = System.currentTimeMillis();
      long allocationDelay =
          appAttempt.containerAllocatedTime - appAttempt.scheduledTime;
      ClusterMetrics.getMetrics().addAMContainerAllocationDelay(
          allocationDelay);
      appAttempt.storeAttempt();
      //此时处理这个事件
      return RMAppAttemptState.ALLOCATED_SAVING;
    }
  }

第12步已经有关相关代码,处理RMAppAttemptState.ALLOCATED_SAVING

事件调用的是new AttemptStoredTransition()

private static final class AttemptStoredTransition extends BaseTransition {
    @Override
    public void transition(RMAppAttemptImpl appAttempt,
                                                    RMAppAttemptEvent event) {
      //将ClientTokenMasterKey保存到存储中后进行注册,否则RM重新启动后客户端可能会持有无效的ClientToken。
      appAttempt.registerClientToken();
      
      appAttempt.launchAttempt();
    }
  }

private void launchAttempt(){
    launchAMStartTime = System.currentTimeMillis();
    //发送事件以启动AM容器AMLauncherEventType.LAUNCH
    eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
  }

五、流程总结

1、构建启动MR AM所需的所有信息(LocalResource、ContainerLaunchContext、启动命令
、ApplicationSubmissionContext等)

2、构建SubmitApplicationRequest(客户端需要通过SubmitApplicationRequest提供运行ApplicationMaster所需的队列、资源等详细信息,相当于ContainerLaunchContext,用于启动ApplicationMaster等)

3、YarnClient通过ApplicationClientProtocol提交SubmitApplicationRequest (ApplicationClientProtocol是客户端和ResourceManager之间的协议,用于提交/中止作业以及获取有关应用程序、集群指标、节点、队列和ACL的信息。)

4、转到ClientRMService提交

5、转到RMAppManager提交

6、创建RMAppEventType.START事件并处理

7、创建RMStateStoreEventType.STORE_APP事件并处理

8、创建RMAppEventType.APP_NEW_SAVED事件并被调度器处理

9、创建RMAppEventType.APP_ACCEPTE事件并处理

10、创建RMAppAttemptEventType.START事件并处理

11、创建SchedulerEventType.APP_ATTEMPT_ADDED事件并被调度器处理

12、创建RMAppAttemptState.SCHEDULED事件并处理

13、创建RMAppAttemptState.ALLOCATED_SAVING事件并处理

14、创建AMLauncherEventType.LAUNCH事件启动ApplicationMaster容器

15、使用命令启动MRAppMaster