Hadoop源码浅析——Job提交相关

时间:2021-12-07 19:39:42

Configuration类是用来访问hadoop的配置参数的。

Configuration类首先会通过静态代码段加载hadoop的配置文件core-default.xml和和core-site.xml,相关代码如下:

static{ 
//print deprecation warning if hadoop-site.xml is found in classpath
ClassLoader cL = Thread.currentThread().getContextClassLoader();
if (cL == null) {
cL = Configuration.class.getClassLoader();
}
if(cL.getResource("hadoop-site.xml")!=null) {
LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
"Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
+ "mapred-site.xml and hdfs-site.xml to override properties of " +
"core-default.xml, mapred-default.xml and hdfs-default.xml " +
"respectively");
}
addDefaultResource("core-default.xml");
addDefaultResource("core-site.xml");
}

defaultResources是一个ArrayList,用来保存默认的配置文件路径。如果一个默认的配置文件路径不在defaultResource里面,就添加进去,这个逻辑是在

addDefaultResource方法中实现的。

properties是一个Properties对象,保存从配置文件中解析出来的配置属性,如果多个配置文件有相同的key,后者会覆盖前者的值。

JobConf类用来配置Map/Reduce作业信息的,继承自Configuration类。

JobConf类首先会通过静态代码段加载mapred-default.xml和mapred-site.xml配置属性文件。

DEFAULT_MAPRED_TASK_JAVA_OPTS=“-Xmx200m”,默认情况下Map/Reduce任务的JAVA命令行选项指定的JAVA虚拟机最大内存是200M。

JobClient类是用户与JobTracker交互的主要接口,通过它可以提交jobs,追踪job的进度,访问task组件的日志,查询集群的状态信息等。

提交job是通过runJob方法实现的,相关代码如下:

public static RunningJob runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
RunningJob rj = jc.submitJob(job);
try {
if (!jc.monitorAndPrintJob(job, rj)) {
LOG.info("Job Failed: " + rj.getFailureInfo());
throw new IOException("Job failed!");
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
return rj;
}

首先创建一个JobClient对象,此对象在构造函数中会根据JobConf对象去连接JobTracker。

 JobClient与JobTracker通信是通过jobSubmitClient操作的,jobSubmitClient是JobSubmissionProtocol类型的动态代理类,是通过如下方法产生的:

  private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
Configuration conf) throws IOException {
return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
JobSubmissionProtocol.versionID, addr,
UserGroupInformation.getCurrentUser(), conf,
NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
}

getProxy方法的关键是Invoker类,Invoker类实现了InvocationHandler接口,主要有两个成员变量,remoteId是Client.ConnectionId类型,保存连接地址和用户的ticket,客户端连接服务器由<remoteAddress,protocol,ticket>唯一标识。从这里我们也可以看到一些配置属性值,默认的rpcTimeout是0,

ipc.client.connection.maxidletime客户端连接的最大空闲时间是10s,

ipc.client.connect.max.retries客户端同服务器建立连接时的最大重试次数是10,

ipc.client.tcpnodelay是否开启Nagle算法(对TCP/IP进行拥塞控制),如果开启,会减少延迟,但是会增加小数据报,默认是false。client是Client类,用于IPC通信。client会通过ClientCache类来缓存,如果缓存中没有,会新建一个Client,否则原client计数加1。Invoker类主要的方法是invoke方法,invoke方法的功能是调用client的方法然后返回结果。动态代理类代理的对象是Client对象。

submitJobInternal方法是真正用来提交job的,具体步骤如下:

1、初始化staging目录,staging目录根目录是由mapreduce.jobtracker.staging.root.dir配置的,默认是/tmp/hadoop/mapred/staging,具体到某个用户的staging目录是$ROOT/userName/.staging。

2、从JobTracker那里取得新的job id,job id从1开始递增。

3、获得提交job的目录submitJobDir=用户的staging目录/jobid,并且将这个目录设置成mapreduce.job.dir的值。

 4、copyAndConfigureFiles拷贝和初始化文件,首先从配置属性mapred.submit.replication取得replication值,默认为10。然后判断submitJobDir目录是否存在,如果存在拋异常;否则创建submitJobDir目录;取得job的分布式缓存文件路径=submitJobDir/files;取得job的分布式缓存存档路径=submitJobDir/archives;取得job的分布式缓存libjars路径=submitJobDir/libjars;如果命令行参数有tmpfiles,则将这些文件拷贝到分布式缓存文件路径下,同时将这个路径加入到分布式缓存中;如果命令行参数有tmpjars,则将这些文件拷贝到分布式缓存libjars路径下,同时将这个路径加入到分布式缓存中;如果命令行参数有tmparchives,则将这些文件拷贝到分布式缓存存档路径下,同时将这个路径加入到分布式缓存中;根据mapred.jar属性取得jar包的路径,如果没有指定job的名字,那么将使用jar包的名字作为job名字;取得job jar的存储路径=submitJobDir/job.jar;将用户指定的jar包拷贝到job jar的存储路径;设置工作目录,默认是配置属性mapred.working.dir指定的值。

5、取得job配置文件的路径submitJobFile=submitJobDir/job.xml;设置

mapreduce.job.submithostaddress为本机ip地址,设置

mapreduce.job.submithost为本机主机名。

6、为job创建输入分区,这是由writeSplits方法完成的。以old api为例,首先调用InputFormat的getSplits方法得到一个InputSplit分区数组,FileInputFormat类的getSplits方法实现过程如下:

通过listStatus方法取得输入文件路径列表,过滤掉_和.开头的路径以及根据设置的mapred.input.pathFilter.class过滤;

在JobConf中设置mapreduce.input.num.files为输入文件数;

计算出所有输入文件的总大小totalSize,目标分区大小goalSize=totalSize/numSplits(由mapred.map.tasks配置,默认为1),最小分区大小minSize=mapred.min.split.size配置和1之间的较大值,对于每一个输入文件,如果这个文件的长度不等于0并且是可切分的,计算分区大小splitSize=Math.max(minSize,Math.min(goalSize,blockSize)),blockSize为HDFS存储文件的块大小,对于每一个分区大小,计算对其贡献最大的主机数组(根据机架以及块的字节大小确定),然后将这个分区加入到分区列表;然后根据分区大长度从大到小对分区列表进行排序;然后将分区列表写入到分区文件,分区文件名=submitJobDir/job.split,分区文件的存储格式:SPL字节信息,分区版本号,{InputSplit类名,InputSplit类信息}+;SplitMetaInfo数组记录每个分区信息在文件中的偏移,主机信息和长度;将分区Meta信息SplitMetaInfo数组写入到文件submitJobDir/job.splitmetainfo。

7、JobConf设置mapred.map.tasks为分区数。

8、根据mapred.job.queue.name获得job提交的队列的名字,默认是default,然后根据这个队列名字获得访问控制列表。

9、将重新配置过的JobConf写入到submitJobDir/job.xml文件。

10、将jobid,submitJobDir信息传给JobTracker正式提交job,并通过NetworkedJob对象跟踪job的状态。

monitorAndPrintJob方法监控job的运行并且实时打印job的状态。