Hadoop2.x Yarn作业提交(客户端)

时间:2022-09-09 08:34:37
YARN作业提交的客户端仍然使用RunJar类,和MR1一样,可参考 
http://blog.csdn.net/lihm0_1/article/details/13629375
在1.x中是向JobTracker提交,而在2.x中换成了ResourceManager,客户端的代理对象也有所变动,换成了YarnRunner,但大致流程和1类似,主要的流程集中在JobSubmitter.submitJobInternal中,包括检测输出目录合法性,设置作业提交信息(主机和用户),获得JobID,向HDFS中拷贝作业所需文件(Job.jar Job.xml split文件等)最有执行作业提交。这里仍然以WordCount为例介绍提交流程.

public static void main(String[] args) throws Exception {      // 创建一个job  
Configuration conf = new Configuration();
conf.set("mapreduce.job.queuename", "p1");

@SuppressWarnings("deprecation")
Job job = new Job(conf, "WordCount");
job.setJarByClass(WordCount.class);
job.setJar("/root/wordcount-2.3.0.jar");


// 设置输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);


// 设置map和reduce类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);


// 设置输入输出流
FileInputFormat.addInputPath(job, new Path("/tmp/a.txt"));
FileOutputFormat.setOutputPath(job, new Path("/tmp/output"));


job.waitForCompletion(true);//此处进入作业提交流程,然后循环监控作业状态
}
此处和1.x相同,提交作业,循环监控作业状态
public boolean waitForCompletion(boolean verbose                                 ) throws IOException, InterruptedException,                                          ClassNotFoundException {  if (state == JobState.DEFINE) {    submit();//提交  }  if (verbose) {    monitorAndPrintJob();  } else {    // get the completion poll interval from the client.    int completionPollIntervalMillis =       Job.getCompletionPollInterval(cluster.getConf());    while (!isComplete()) {      try {        Thread.sleep(completionPollIntervalMillis);      } catch (InterruptedException ie) {      }    }  }  return isSuccessful();}
主要分析submit函数,来看作业是如何提交的,此处已经和1.x不同。但仍分为两个阶段1、连接master 2、作业提交
public void submit()        throws IOException, InterruptedException, ClassNotFoundException {  ensureState(JobState.DEFINE);  setUseNewAPI();  //连接RM  connect();  final JobSubmitter submitter =       getJobSubmitter(cluster.getFileSystem(), cluster.getClient());  status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {    public JobStatus run() throws IOException, InterruptedException,     ClassNotFoundException {    	//提交作业      return submitter.submitJobInternal(Job.this, cluster);    }  });  state = JobState.RUNNING;  LOG.info("The url to track the job: " + getTrackingURL());}
连接master时会建立Cluster实例,下面是Cluster构造函数,其中重点初始化部分
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)     throws IOException {  this.conf = conf;  this.ugi = UserGroupInformation.getCurrentUser();  initialize(jobTrackAddr, conf);}
创建客户端代理阶段用到了java.util.ServiceLoader,目前2.3.0版本包含两个LocalClientProtocolProvider(本地作业) YarnClientProtocolProvider(Yarn作业),此处会根据mapreduce.framework.name的配置创建相应的客户端
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)    throws IOException {  synchronized (frameworkLoader) {    for (ClientProtocolProvider provider : frameworkLoader) {      LOG.debug("Trying ClientProtocolProvider : "          + provider.getClass().getName());      ClientProtocol clientProtocol = null;       try {        if (jobTrackAddr == null) {        	//创建YARNRunner对象          clientProtocol = provider.create(conf);        } else {          clientProtocol = provider.create(jobTrackAddr, conf);        }				//初始化Cluster内部成员变量        if (clientProtocol != null) {          clientProtocolProvider = provider;          client = clientProtocol;          LOG.debug("Picked " + provider.getClass().getName()              + " as the ClientProtocolProvider");          break;        }        else {          LOG.debug("Cannot pick " + provider.getClass().getName()              + " as the ClientProtocolProvider - returned null protocol");        }      }       catch (Exception e) {        LOG.info("Failed to use " + provider.getClass().getName()            + " due to error: " + e.getMessage());      }    }  }	//异常处理,如果此处出现异常,则证明加载的jar包有问题,YarnRunner所处的jar不存在?  if (null == clientProtocolProvider || null == client) {    throw new IOException(        "Cannot initialize Cluster. Please check your configuration for "            + MRConfig.FRAMEWORK_NAME            + " and the correspond server addresses.");  }}
provider创建代理实际是创建了一个YranRunner对象,因为我们这里提交的不是Local的,而是Yarn作业
@Overridepublic ClientProtocol create(Configuration conf) throws IOException {  if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {    return new YARNRunner(conf);  }  return null;}
创建客户端代理的流程如下:
Cluster->ClientProtocol(YarnRunner)->ResourceMgrDelegate->client(YarnClientImpl)->rmClient(ApplicationClientProtocol)

在YarnClientImpl的serviceStart阶段会创建RPC代理,注意其中的协议。
protected void serviceStart() throws Exception {  try {
rmClient = ClientRMProxy.createRMProxy(getConfig(),
ApplicationClientProtocol.class);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
super.serviceStart();
}
YarnRunner的构造函数如下:
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,    ClientCache clientCache) {  this.conf = conf;  try {    this.resMgrDelegate = resMgrDelegate;    this.clientCache = clientCache;    this.defaultFileContext = FileContext.getFileContext(this.conf);  } catch (UnsupportedFileSystemException ufe) {    throw new RuntimeException("Error in instantiating YarnClient", ufe);  }}
下面看最核心的提交部分JobSubmitter.submitJobInternal
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {  //检测输出目录合法性,是否已存在,或未设置  checkSpecs(job);  Configuration conf = job.getConfiguration();  addMRFrameworkToDistributedCache(conf);	//获得登录区,用以存放作业执行过程中用到的文件,默认位置/tmp/hadoop-yarn/staging/root/.staging ,可通过yarn.app.mapreduce.am.staging-dir修改  Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);  //主机名和地址设置  InetAddress ip = InetAddress.getLocalHost();  if (ip != null) {    submitHostAddress = ip.getHostAddress();    submitHostName = ip.getHostName();    conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);    conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);  }  //获取新的JobID,此处需要RPC调用  JobID jobId = submitClient.getNewJobID();  job.setJobID(jobId);  //获取提交目录:/tmp/hadoop-yarn/staging/root/.staging/job_1395778831382_0002  Path submitJobDir = new Path(jobStagingArea, jobId.toString());  JobStatus status = null;  try {    conf.set(MRJobConfig.USER_NAME,        UserGroupInformation.getCurrentUser().getShortUserName());    conf.set("hadoop.http.filter.initializers",         "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");    conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());    LOG.debug("Configuring job " + jobId + " with " + submitJobDir         + " as the submit dir");    // get delegation token for the dir    TokenCache.obtainTokensForNamenodes(job.getCredentials(),        new Path[] { submitJobDir }, conf);        populateTokenCache(conf, job.getCredentials());    // generate a secret to authenticate shuffle transfers    if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {      KeyGenerator keyGen;      try {        keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);        keyGen.init(SHUFFLE_KEY_LENGTH);      } catch (NoSuchAlgorithmException e) {        throw new IOException("Error generating shuffle secret key", e);      }      SecretKey shuffleKey = keyGen.generateKey();      TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),          job.getCredentials());    }    //向集群中拷贝所需文件,下面会单独分析(1)    copyAndConfigureFiles(job, submitJobDir);    Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);        // 写分片文件job.split job.splitmetainfo,具体写入过程与MR1相同,可参考以前文章    LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));    int maps = writeSplits(job, submitJobDir);    conf.setInt(MRJobConfig.NUM_MAPS, maps);    LOG.info("number of splits:" + maps);    // write "queue admins of the queue to which job is being submitted"    // to job file.    //设置队列名    String queue = conf.get(MRJobConfig.QUEUE_NAME,        JobConf.DEFAULT_QUEUE_NAME);    AccessControlList acl = submitClient.getQueueAdmins(queue);    conf.set(toFullPropertyName(queue,        QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());    // removing jobtoken referrals before copying the jobconf to HDFS    // as the tasks don't need this setting, actually they may break    // because of it if present as the referral will point to a    // different job.    TokenCache.cleanUpTokenReferral(conf);    if (conf.getBoolean(        MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,        MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {      // Add HDFS tracking ids      ArrayList<String> trackingIds = new ArrayList<String>();      for (Token<? extends TokenIdentifier> t :          job.getCredentials().getAllTokens()) {        trackingIds.add(t.decodeIdentifier().getTrackingId());      }      conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,          trackingIds.toArray(new String[trackingIds.size()]));    }    // Write job file to submit dir    //写入job.xml    writeConf(conf, submitJobFile);        //    // Now, actually submit the job (using the submit name)    //这里才开始真正提交,见下面分析(2)    printTokens(jobId, job.getCredentials());    status = submitClient.submitJob(        jobId, submitJobDir.toString(), job.getCredentials());    if (status != null) {      return status;    } else {      throw new IOException("Could not launch job");    }  } finally {    if (status == null) {      LOG.info("Cleaning up the staging area " + submitJobDir);      if (jtFs != null && submitJobDir != null)        jtFs.delete(submitJobDir, true);    }  }}
(1)文件拷贝过程如下,默认副本数为10
private void copyAndConfigureFiles(Job job, Path jobSubmitDir) throws IOException {  Configuration conf = job.getConfiguration();  short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);  //开始拷贝  copyAndConfigureFiles(job, jobSubmitDir, replication);  // Set the working directory  if (job.getWorkingDirectory() == null) {    job.setWorkingDirectory(jtFs.getWorkingDirectory());            }}
下面是具体文件拷贝过程,注释里写的也比较清楚了
// configures -files, -libjars and -archives.private void copyAndConfigureFiles(Job job, Path submitJobDir,    short replication) throws IOException {  Configuration conf = job.getConfiguration();  if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {    LOG.warn("Hadoop command-line option parsing not performed. " +             "Implement the Tool interface and execute your application " +             "with ToolRunner to remedy this.");  }  // get all the command line arguments passed in by the user conf  String files = conf.get("tmpfiles");  String libjars = conf.get("tmpjars");  String archives = conf.get("tmparchives");  String jobJar = job.getJar();  //  // Figure out what fs the JobTracker is using.  Copy the  // job to it, under a temporary name.  This allows DFS to work,  // and under the local fs also provides UNIX-like object loading   // semantics.  (that is, if the job file is deleted right after  // submission, we can still run the submission to completion)  //  // Create a number of filenames in the JobTracker's fs namespace  LOG.debug("default FileSystem: " + jtFs.getUri());  if (jtFs.exists(submitJobDir)) {    throw new IOException("Not submitting job. Job directory " + submitJobDir        +" already exists!! This is unexpected.Please check what's there in" +        " that directory");  }  submitJobDir = jtFs.makeQualified(submitJobDir);  submitJobDir = new Path(submitJobDir.toUri().getPath());  FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);  //创建工作目录  FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);  Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);  Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);  Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);  // add all the command line files/ jars and archive  // first copy them to jobtrackers filesystem   //建立上述所需目录    if (files != null) {    FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);    String[] fileArr = files.split(",");    for (String tmpFile: fileArr) {      URI tmpURI = null;      try {        tmpURI = new URI(tmpFile);      } catch (URISyntaxException e) {        throw new IllegalArgumentException(e);      }      Path tmp = new Path(tmpURI);      Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);      try {        URI pathURI = getPathURI(newPath, tmpURI.getFragment());        DistributedCache.addCacheFile(pathURI, conf);      } catch(URISyntaxException ue) {        //should not throw a uri exception         throw new IOException("Failed to create uri for " + tmpFile, ue);      }    }  }      if (libjars != null) {    FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);    String[] libjarsArr = libjars.split(",");    for (String tmpjars: libjarsArr) {      Path tmp = new Path(tmpjars);      Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);      DistributedCache.addFileToClassPath(          new Path(newPath.toUri().getPath()), conf);    }  }      if (archives != null) {    FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);     String[] archivesArr = archives.split(",");    for (String tmpArchives: archivesArr) {      URI tmpURI;      try {        tmpURI = new URI(tmpArchives);      } catch (URISyntaxException e) {        throw new IllegalArgumentException(e);      }      Path tmp = new Path(tmpURI);      Path newPath = copyRemoteFiles(archivesDir, tmp, conf,        replication);      try {        URI pathURI = getPathURI(newPath, tmpURI.getFragment());        DistributedCache.addCacheArchive(pathURI, conf);      } catch(URISyntaxException ue) {        //should not throw an uri excpetion        throw new IOException("Failed to create uri for " + tmpArchives, ue);      }    }  }  if (jobJar != null) {   // copy jar to JobTracker's fs    // use jar name if job is not named.     if ("".equals(job.getJobName())){      job.setJobName(new Path(jobJar).getName());    }    Path jobJarPath = new Path(jobJar);    URI jobJarURI = jobJarPath.toUri();    // If the job jar is already in fs, we don't need to copy it from local fs    if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null            || !(jobJarURI.getScheme().equals(jtFs.getUri().getScheme())                 && jobJarURI.getAuthority().equals(                                          jtFs.getUri().getAuthority()))) {      //拷贝wordcount.jar,注意拷贝过去后会重命名为job.jar,副本数为10      copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),           replication);      job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());    }  } else {    LOG.warn("No job jar file set.  User classes may not be found. "+    "See Job or Job#setJar(String).");  }  //  set the timestamps of the archives and files  //  set the public/private visibility of the archives and files  ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);  // get DelegationToken for each cached file  ClientDistributedCacheManager.getDelegationTokens(conf, job      .getCredentials());}
(2)真正的作业提交部分
@Overridepublic JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)throws IOException, InterruptedException {    addHistoryToken(ts);    // Construct necessary information to start the MR AM  ApplicationSubmissionContext appContext =    createApplicationSubmissionContext(conf, jobSubmitDir, ts);  // Submit to ResourceManager  try {    ApplicationId applicationId =        resMgrDelegate.submitApplication(appContext);    ApplicationReport appMaster = resMgrDelegate        .getApplicationReport(applicationId);    String diagnostics =        (appMaster == null ?            "application report is null" : appMaster.getDiagnostics());    if (appMaster == null        || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED        || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {      throw new IOException("Failed to run job : " +          diagnostics);    }    return clientCache.getClient(jobId).getJobStatus(jobId);  } catch (YarnException e) {    throw new IOException(e);  }}