http://blog.csdn.net/lihm0_1/article/details/13629375
在1.x中是向JobTracker提交,而在2.x中换成了ResourceManager,客户端的代理对象也有所变动,换成了YarnRunner,但大致流程和1类似,主要的流程集中在JobSubmitter.submitJobInternal中,包括检测输出目录合法性,设置作业提交信息(主机和用户),获得JobID,向HDFS中拷贝作业所需文件(Job.jar Job.xml split文件等)最有执行作业提交。这里仍然以WordCount为例介绍提交流程.
public static void main(String[] args) throws Exception { // 创建一个job此处和1.x相同,提交作业,循环监控作业状态
Configuration conf = new Configuration();
conf.set("mapreduce.job.queuename", "p1");
@SuppressWarnings("deprecation")
Job job = new Job(conf, "WordCount");
job.setJarByClass(WordCount.class);
job.setJar("/root/wordcount-2.3.0.jar");
// 设置输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置map和reduce类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
// 设置输入输出流
FileInputFormat.addInputPath(job, new Path("/tmp/a.txt"));
FileOutputFormat.setOutputPath(job, new Path("/tmp/output"));
job.waitForCompletion(true);//此处进入作业提交流程,然后循环监控作业状态
}
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit();//提交 } if (verbose) { monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful();}主要分析submit函数,来看作业是如何提交的,此处已经和1.x不同。但仍分为两个阶段1、连接master 2、作业提交
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); //连接RM connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { //提交作业 return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL());}连接master时会建立Cluster实例,下面是Cluster构造函数,其中重点初始化部分
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); initialize(jobTrackAddr, conf);}创建客户端代理阶段用到了java.util.ServiceLoader,目前2.3.0版本包含两个LocalClientProtocolProvider(本地作业) YarnClientProtocolProvider(Yarn作业),此处会根据mapreduce.framework.name的配置创建相应的客户端
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { synchronized (frameworkLoader) { for (ClientProtocolProvider provider : frameworkLoader) { LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { if (jobTrackAddr == null) { //创建YARNRunner对象 clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } //初始化Cluster内部成员变量 if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; } else { LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: " + e.getMessage()); } } } //异常处理,如果此处出现异常,则证明加载的jar包有问题,YarnRunner所处的jar不存在? if (null == clientProtocolProvider || null == client) { throw new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); }}provider创建代理实际是创建了一个YranRunner对象,因为我们这里提交的不是Local的,而是Yarn作业
@Overridepublic ClientProtocol create(Configuration conf) throws IOException { if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { return new YARNRunner(conf); } return null;}创建客户端代理的流程如下:
Cluster->ClientProtocol(YarnRunner)->ResourceMgrDelegate->client(YarnClientImpl)->rmClient(ApplicationClientProtocol)
在YarnClientImpl的serviceStart阶段会创建RPC代理,注意其中的协议。
protected void serviceStart() throws Exception { try {YarnRunner的构造函数如下:
rmClient = ClientRMProxy.createRMProxy(getConfig(),
ApplicationClientProtocol.class);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
super.serviceStart();
}
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, ClientCache clientCache) { this.conf = conf; try { this.resMgrDelegate = resMgrDelegate; this.clientCache = clientCache; this.defaultFileContext = FileContext.getFileContext(this.conf); } catch (UnsupportedFileSystemException ufe) { throw new RuntimeException("Error in instantiating YarnClient", ufe); }}下面看最核心的提交部分JobSubmitter.submitJobInternal
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //检测输出目录合法性,是否已存在,或未设置 checkSpecs(job); Configuration conf = job.getConfiguration(); addMRFrameworkToDistributedCache(conf); //获得登录区,用以存放作业执行过程中用到的文件,默认位置/tmp/hadoop-yarn/staging/root/.staging ,可通过yarn.app.mapreduce.am.staging-dir修改 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //主机名和地址设置 InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } //获取新的JobID,此处需要RPC调用 JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); //获取提交目录:/tmp/hadoop-yarn/staging/root/.staging/job_1395778831382_0002 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } //向集群中拷贝所需文件,下面会单独分析(1) copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // 写分片文件job.split job.splitmetainfo,具体写入过程与MR1相同,可参考以前文章 LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. //设置队列名 String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // Write job file to submit dir //写入job.xml writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) //这里才开始真正提交,见下面分析(2) printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } }}(1)文件拷贝过程如下,默认副本数为10
private void copyAndConfigureFiles(Job job, Path jobSubmitDir) throws IOException { Configuration conf = job.getConfiguration(); short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10); //开始拷贝 copyAndConfigureFiles(job, jobSubmitDir, replication); // Set the working directory if (job.getWorkingDirectory() == null) { job.setWorkingDirectory(jtFs.getWorkingDirectory()); }}下面是具体文件拷贝过程,注释里写的也比较清楚了
// configures -files, -libjars and -archives.private void copyAndConfigureFiles(Job job, Path submitJobDir, short replication) throws IOException { Configuration conf = job.getConfiguration(); if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) { LOG.warn("Hadoop command-line option parsing not performed. " + "Implement the Tool interface and execute your application " + "with ToolRunner to remedy this."); } // get all the command line arguments passed in by the user conf String files = conf.get("tmpfiles"); String libjars = conf.get("tmpjars"); String archives = conf.get("tmparchives"); String jobJar = job.getJar(); // // Figure out what fs the JobTracker is using. Copy the // job to it, under a temporary name. This allows DFS to work, // and under the local fs also provides UNIX-like object loading // semantics. (that is, if the job file is deleted right after // submission, we can still run the submission to completion) // // Create a number of filenames in the JobTracker's fs namespace LOG.debug("default FileSystem: " + jtFs.getUri()); if (jtFs.exists(submitJobDir)) { throw new IOException("Not submitting job. Job directory " + submitJobDir +" already exists!! This is unexpected.Please check what's there in" + " that directory"); } submitJobDir = jtFs.makeQualified(submitJobDir); submitJobDir = new Path(submitJobDir.toUri().getPath()); FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); //创建工作目录 FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms); Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir); Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir); Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir); // add all the command line files/ jars and archive // first copy them to jobtrackers filesystem //建立上述所需目录 if (files != null) { FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms); String[] fileArr = files.split(","); for (String tmpFile: fileArr) { URI tmpURI = null; try { tmpURI = new URI(tmpFile); } catch (URISyntaxException e) { throw new IllegalArgumentException(e); } Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication); try { URI pathURI = getPathURI(newPath, tmpURI.getFragment()); DistributedCache.addCacheFile(pathURI, conf); } catch(URISyntaxException ue) { //should not throw a uri exception throw new IOException("Failed to create uri for " + tmpFile, ue); } } } if (libjars != null) { FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms); String[] libjarsArr = libjars.split(","); for (String tmpjars: libjarsArr) { Path tmp = new Path(tmpjars); Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication); DistributedCache.addFileToClassPath( new Path(newPath.toUri().getPath()), conf); } } if (archives != null) { FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms); String[] archivesArr = archives.split(","); for (String tmpArchives: archivesArr) { URI tmpURI; try { tmpURI = new URI(tmpArchives); } catch (URISyntaxException e) { throw new IllegalArgumentException(e); } Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(archivesDir, tmp, conf, replication); try { URI pathURI = getPathURI(newPath, tmpURI.getFragment()); DistributedCache.addCacheArchive(pathURI, conf); } catch(URISyntaxException ue) { //should not throw an uri excpetion throw new IOException("Failed to create uri for " + tmpArchives, ue); } } } if (jobJar != null) { // copy jar to JobTracker's fs // use jar name if job is not named. if ("".equals(job.getJobName())){ job.setJobName(new Path(jobJar).getName()); } Path jobJarPath = new Path(jobJar); URI jobJarURI = jobJarPath.toUri(); // If the job jar is already in fs, we don't need to copy it from local fs if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null || !(jobJarURI.getScheme().equals(jtFs.getUri().getScheme()) && jobJarURI.getAuthority().equals( jtFs.getUri().getAuthority()))) { //拷贝wordcount.jar,注意拷贝过去后会重命名为job.jar,副本数为10 copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir), replication); job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString()); } } else { LOG.warn("No job jar file set. User classes may not be found. "+ "See Job or Job#setJar(String)."); } // set the timestamps of the archives and files // set the public/private visibility of the archives and files ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf); // get DelegationToken for each cached file ClientDistributedCacheManager.getDelegationTokens(conf, job .getCredentials());}(2)真正的作业提交部分
@Overridepublic JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)throws IOException, InterruptedException { addHistoryToken(ts); // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager try { ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); } catch (YarnException e) { throw new IOException(e); }}