继上篇文章对MapReduce Job本地提交过程进行分析之后
在本篇文章中,同样将通过debug的方式,对Job作业的集群提交过程进行分析
Job作业集群的提交有别于本地的提交方式,本地的提交采用了LocalJobRunner,而集群提交则采用了YARNRunner
决定使用LocalJobRunner还是YARNRunner取决于配置文件
所写的MapReduce程序、debug的操作步骤以及本文中涉及到的内容均以整理好打包上传,下载地址。
[mapred-site.xml]
mapreduce.framework.name = local\yarn
Job作业集群提交过程简略表示
先将整理得出的提交过程简略标示图,以方便阅读后文所写的关键代码解析
job.submit()
--> JobSubmitter.submitJobInternal()
--> YARNRunner.submitJob(..)
--> ResourceMgrDelegate.submitApplication(appContext)
--> YarnClient.submitApplication(appContext)
--> YarnClientImpl.submitApplication(...)
SubmitApplicationRequest request = ... //创建请求报文
--> ApplicationClientProtocolPBClientImpl.submitApplication(request)
SubmitApplicationRequestProto requestProto = request... //把request对象转换成提交应用的请求协议
--> ProtobufRpcEngine$Invoker.invoke();
RequestHeaderProto header = constructRpcRequestHeader(method); //消息头
Message body = (Message) args[1]; //消息体
RpcResponseWrapper wrapper = (header,body); //消息头+消息体,形成一个包装器
--> ipc.Client.cal(...)
client.call(RPC.RpcKind.RPC_RPOTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequestHeader,theRequest),remoteId,
fallbackToSimpleAuth);
--> ipc.Client.call(...)
Call call = createCall(rpcKind,reqWrapper);
--> Connection.sendRpcRequest(call);
源码解析
org.apache.hadoop.mapreduce.Job类
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
// 进行提交
submit();
}
...
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// 进行连接
connect();
// 获取JobSubmitter类型的submitter
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
// 调用的是JobSubmitter类型的submitter 的 submitJobInternal()方法
// 实质就是调用作业提交器的内部提交方法
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
org.apache.hadoop.mapreduce.JobSubmitter类
// 开始了内部提交过程
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
// 先检查空间
checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir
// 写入配置文件到服务器
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
// YARNRunner的提交作业方法
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
org.apache.hadoop.mapred.YARNRunner类
@Override
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
addHistoryToken(ts);
// Construct necessary information to start the MR AM
// 得到上下文对象
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// Submit to ResourceManager
try {
// 调用资源管理器代理对象的提交应用程序方法
// resMgrDelegate的值为:ResourceMgrDelegate(资源管理器代理对象)
// Service org.apache.hadoop.mapred.ResourceMgrDelegate in state org.apache.hadoop.mapred.ResourceMgrDelegate: STARTED
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
String diagnostics =
(appMaster == null ?
"application report is null" : appMaster.getDiagnostics());
if (appMaster == null
|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
diagnostics);
}
return clientCache.getClient(jobId).getJobStatus(jobId);
} catch (YarnException e) {
throw new IOException(e);
}
}
org.apache.hadoop.mapred.ResourceMgrDelegate类
@Override
public ApplicationId
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
// client的值为:Service org.apache.hadoop.yarn.client.api.impl.YarnClientImpl in state org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: STARTED
// 调用的是YarnClientImpl的提交应用的方法
return client.submitApplication(appContext);
}
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl类
@Override
public ApplicationId
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId();
if (applicationId == null) {
throw new ApplicationIdNotProvidedException(
"ApplicationId is not provided in ApplicationSubmissionContext");
}
// 构造一个提交应用请求的对象
SubmitApplicationRequest request =
Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
...
// rmClient的值为:ApplicationClientProtocolPBClientImpl
// Application客户端协议客户端实现类,也是一个提交应用的方法,同时传入request请求对象
// ApplicationClientProtocolPBClientImpl.submitApplication()
rmClient.submitApplication(request);
...
...
}
org.apache.hadoop.io.retry.RetryInvocationHandler类
invoke(...){
...
// 反射
// method:public abstract org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse org.apache.hadoop.yarn.api.ApplicationClientProtocol.submitApplication(org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest) throws org.apache.hadoop.yarn.exceptions.YarnException,java.io.IOException
// 实质上method为:submitApplication()方法
// 实质上args为:request信息
Object ret = invokeMethod(method, args);
...
}
...
// java的反射方式
protected Object invokeMethod(Method method, Object[] args) throws Throwable {
try {
if (!method.isAccessible()) {
// 设置可访问性
method.setAccessible(true);
}
// 开始访问
return method.invoke(currentProxy.proxy, args);
} catch (InvocationTargetException e) {
throw e.getCause();
}
}
java.lang.reflect.Method类
@CallerSensitive
public Object invoke(Object obj, Object... args)
throws IllegalAccessException, IllegalArgumentException,
InvocationTargetException
{
if (!override) {
if (!Reflection.quickCheckMemberAccess(clazz, modifiers)) {
Class<?> caller = Reflection.getCallerClass();
checkAccess(caller, clazz, obj, modifiers);
}
}
// 访问器
MethodAccessor ma = methodAccessor; // read volatile
if (ma == null) {
ma = acquireMethodAccessor();
}
// 进入该方法,开始进行反射获取对象
return ma.invoke(obj, args);
}
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl类
// 通过反射的方式,rmClient.submitApplication(request) rmClient的值实际上为ApplicationClientProtocolPBClientImpl
// 再度调用了ApplicationClientProtocolPBClientImpl.submitApplication()方法
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException,
IOException {
// 构造了一个提交应用请求协议
SubmitApplicationRequestProto requestProto =
((SubmitApplicationRequestPBImpl) request).getProto();
try {
// 调用了代理的提交应用方法,在这里直接进入org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker
return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null,
requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
org.apache.hadoop.ipc.ProtobufRpcEngine类
Invoker{
...
public Object invoke(Object proxy, Method method, Object[] args)
throws ServiceException {
// 制作请求的头协议
// 查看rpcRequestHeader的值:
// methodName: "submitApplication"
declaringClassProtocolName: "org.apache.hadoop.yarn.api.ApplicationClientProtocolPB"
clientProtocolVersion: 1
// bitField0_ 7
// clientProtocolVersion_ 1
// declaringClassProtocolName_ "org.apache.hadoop.yarn.api.ApplicationClientProtocolPB" (id=4069)
// memoizedHashCode 0
// memoizedIsInitialized 1
// memoizedSerializedSize -1
// memoizedSize -1
// methodName_ "submitApplication" (id=4073)
// unknownFields UnknownFieldSet (id=4075)
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
...
// 消息,实际上为第一个参数 theRequest的具体值在上文的操作步骤里进行查看
// theRequest = YarnProtos$ApplicationSubmissionContextProto 上文是请求头协议$下文是提交应用请求协议
// theRequest的值的格式为JSON格式,具体内容由于篇幅过长,将在后文中给出
Message theRequest = (Message) args[1];
...
// Wrapper包装类
final RpcResponseWrapper val;
try {
// 在构造RpcRequestWrapper类的同时调用了org.apache.hadoop.ipc.Client.call()方法
// 涉及到了hadoop的底层
// 该方法的作用:传入前面封装的RpcRequestWrapper类的对象,带上RPC的类型,套接字的远程地址(8032端口,即资源管理器)
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
// 把头和消息加在一起构成了Wrapper包装类
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);
} catch{
...
}
...
}
...
}
org.apache.hadoop.ipc.Client类
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
fallbackToSimpleAuth);
}
...
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
// 创建一个Call对象,将rpc的操作类型和request传入
// 实质上就是传入了一个RpcRequestWrapper类
final Call call = createCall(rpcKind, rpcRequest);
// 得到一个连接
// connection的值为:Thread[IPC Client (975033189) connection to s100/192.168.26.100:8032 from zhaotao,5,main]
Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
...
// 传入call对象,通过连接去发送RPC请求
connection.sendRpcRequest(call);
...
sendRpcRequest(..){
...
// 先将call对象写入到当前的数据输出缓冲区
final DataOutputBuffer d = new DataOutputBuffer();
// 制作了RPC请求的头协议
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId);
// 将协议头写入到缓冲区
header.writeDelimitedTo(d);
// 将call对象的RPC请求也写入了缓冲区
call.rpcRequest.write(d);
// 将数据都写入了DataOutputBuffer里去了,DataOutputBuffer继承了DataOutputStream
...
...
// 使用了同步机制
synchronized (sendRpcRequestLock) {
// senderFuture即为在线程池中,将来某个阶段要运行的程序
Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
@Override
public void run() {
try {
// Connection.this.out为连接中的流对象(连接中的数据流)
synchronized (Connection.this.out) {
if (shouldCloseConnection.get()) {
return;
}
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
// d为本地存放的缓冲区数据 得到d中的所有数据
byte[] data = d.getData();
// 得到数据长度
int totalLength = d.getLength();
// 把长度写入输出流
out.writeInt(totalLength); // Total Length
// 把数据写入到输出流中去
out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
// 清理输出流
out.flush();
// 至此,客户端往服务端的消息发送完毕
// out:DataOutputStream --> BufferedOutputStream --> SocketOutputStream --> SocketOutputStream$Writer --> SocketChannel
}
}
...
}
Job作业集群提交流程分析图
注:图片不清晰,右键图片在新标签页中显示图片即可,可以放大观看。对于图片格式调整,不太在行。。抱歉
附录
org.apache.hadoop.ipc.ProtobufRpcEngine类中Message theRequest = (Message) args[1]语句里的 theRequest 值
application_submission_context {
application_id {
id: 1
cluster_timestamp: 1494680303350
}
application_name: "Max temperature"
queue: "default"
am_container_spec {
localResources {
key: "jobSubmitDir/job.splitmetainfo"
value {
resource {
scheme: "hdfs"
host: "s100"
port: 8020
file: "/tmp/hadoop-yarn/staging/zhaotao/.staging/job_1494680303350_0001/job.splitmetainfo"
}
size: 52
timestamp: 1494681000611
type: FILE
visibility: APPLICATION
}
}
localResources {
key: "job.jar"
value {
resource {
scheme: "hdfs"
host: "s100"
port: 8020
file: "/tmp/hadoop-yarn/staging/zhaotao/.staging/job_1494680303350_0001/job.jar"
}
size: 6739
timestamp: 1494680999584
type: PATTERN
visibility: APPLICATION
pattern: "(?:classes/|lib/).*"
}
}
localResources {
key: "jobSubmitDir/job.split"
value {
resource {
scheme: "hdfs"
host: "s100"
port: 8020
file: "/tmp/hadoop-yarn/staging/zhaotao/.staging/job_1494680303350_0001/job.split"
}
size: 195
timestamp: 1494681000515
type: FILE
visibility: APPLICATION
}
}
localResources {
key: "job.xml"
value {
resource {
scheme: "hdfs"
host: "s100"
port: 8020
file: "/tmp/hadoop-yarn/staging/zhaotao/.staging/job_1494680303350_0001/job.xml"
}
size: 98114
timestamp: 1494681001303
type: FILE
visibility: APPLICATION
}
}
tokens: "HDTS\000\000\001\025MapReduceShuffleToken\b\336U\020\246\317\t\253\315"
environment {
key: "HADOOP_CLASSPATH"
value: "$PWD:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*:/soft/hadoop/contrib/capacity-scheduler/*.jar"
}
environment {
key: "SHELL"
value: "/bin/bash"
}
environment {
key: "CLASSPATH"
value: "$PWD:$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*"
}
environment {
key: "LD_LIBRARY_PATH"
value: "$PWD:{{HADOOP_COMMON_HOME}}/lib/native"
}
command: "$JAVA_HOME/bin/java -Djava.io.tmpdir=$PWD/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=<LOG_DIR> -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog -Xmx1024m org.apache.hadoop.mapreduce.v2.app.MRAppMaster 1><LOG_DIR>/stdout 2><LOG_DIR>/stderr "
application_ACLs {
accessType: APPACCESS_VIEW_APP
acl: " "
}
application_ACLs {
accessType: APPACCESS_MODIFY_APP
acl: " "
}
}
cancel_tokens_when_complete: true
maxAppAttempts: 2
resource {
memory: 1536
virtual_cores: 1
}
applicationType: "MAPREDUCE"
}