客户端MapReduce提交到YARN过程(二)

时间:2022-09-17 08:34:14
客户端通过RPC协议ClientRMProtocol提交Application,其实是提交了一个SubmitApplicationRequest,在Hadoop 1.0时代,是使用Writable作为序列化和反序列化框架的,而在2.0中hadoop已经废弃掉了,改用了ProtocolBuffer,它除了支持多种语言外最大的好处是向后兼容性,这样不同版本的AM,Client,RM和NM之间能相互通信。
比如对于SubmitApplicationRequest, 源代码只有PB的实现SubmitApplicationRequestPBImpl,当然用户也可以实现其他的序列化反序列化框架的实现,包括Writable实现
SubmitApplicationRequest中会封装提交上下文信息ApplicationSubmissionContext. ClientRMProtocol在RM的实现类是ClientRMService, RM在启动的时候会初始化这个Service,该Service负责端口监听RPC请求(由yarn.resourcemanager.address设置)
服务端根据提交上下文信息构造RMAppManagerSubmitEvent对象,交由rmAppManager来处理,RMAppManager在RM中的作用是管理所有的Application,比如提交/完成application,恢复一组applications等等。
ClientRMService的submitApplication方法:
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnRemoteException {
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
String user = submissionContext.getUser();
user = UserGroupInformation.getCurrentUser().getShortUserName();
if (rmContext.getRMApps().get(applicationId) != null) {
throw new IOException("Application with id " + applicationId
+ " is already present! Cannot add a duplicate!");
}
submissionContext.setUser(user);
// 将Application信息提交给rmAppManager来启动ApplicationMaster
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
.currentTimeMillis()));

// If recovery is enabled then store the application information in a
// blocking call so make sure that RM has stored the information needed
// to restart the AM after RM restart without further client communication
RMStateStore stateStore = rmContext.getStateStore();
LOG.info("Storing Application with id " + applicationId);
try {
stateStore.storeApplication(rmContext.getRMApps().get(applicationId));
} catch (Exception e) {
LOG.error("Failed to store application:" + applicationId, e);
ExitUtil.terminate(1, e);
}
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId);


SubmitApplicationResponse response = recordFactory
.newRecordInstance(SubmitApplicationResponse.class);
return response;
}

YARN采用了基于事件驱动的编程模型,一个状态的改变可以触发一个或多个事件,同时可以触发其他状态发生变化。对于RMAppManagerSubmitEvent他有两种状态APP_SUBMITAPP_COMPLETED,在RMAppManager Handle这个Event的时候,如果是APP_COMPLETED则执行finishApplication将ApplicationId加入completedApps队列中,若状态为APP_SUBMIT则执行submitApplication方法生成一个RMApp,放入RMContext的Map<ApplicationId, RMApp> applications总,然后生成一个状态为START的RMAppEvent交由AsyncDispatcher*调度器来处理。
调用栈
客户端MapReduce提交到YARN过程(二)

RMAppManager的handle函数
  public void handle(RMAppManagerEvent event) {
ApplicationId applicationId = event.getApplicationId();
LOG.debug("RMAppManager processing event for "
+ applicationId + " of type " + event.getType());
switch(event.getType()) {
case APP_COMPLETED:
{
finishApplication(applicationId);
ApplicationSummary.logAppSummary(
rmContext.getRMApps().get(applicationId));
checkAppNumCompletedLimit();
}
break;
case APP_SUBMIT:
{
ApplicationSubmissionContext submissionContext =
((RMAppManagerSubmitEvent)event).getSubmissionContext();
long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
submitApplication(submissionContext, submitTime);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}

submitApplication逻辑
 protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime) {
ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null;
try {
// Create RMApp
application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(),
submissionContext.getUser(), submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService,
submitTime);
rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null);
// All done, start the RMApp
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.START));
} catch (IOException ie) {
LOG.info("RMAppManager submit application exception", ie);
if (application != null) {
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we havne't yet informed the
// Scheduler about the existence of the application
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppRejectedEvent(applicationId, ie.getMessage()));
}
}
}


本文链接http://blog.csdn.net/lalaguozhe/article/details/10142707,转载请注明