MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster,代码如下:
private synchronized void connect()这个方法用synchronized关键字标识,处理逻辑为:如果cluster为null,构造Cluster实例cluster。
throws IOException, InterruptedException, ClassNotFoundException {
// 如果cluster为null,构造Cluster实例cluster,
// Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
}
Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法,我们看下它的成员变量,如下所示:
// 客户端通信协议提供者Cluster最重要的两个成员变量是客户端通信协议提供者ClientProtocolProvider实例clientProtocolProvider,客户端通信协议ClientProtocol实例client,而后者是依托前者的create()方法生成的。
private ClientProtocolProvider clientProtocolProvider;
// 客户端通信协议实例
private ClientProtocol client;
// 用户信息
private UserGroupInformation ugi;
// 配置信息
private Configuration conf;
// 文件系统实例
private FileSystem fs = null;
// 系统路径
private Path sysDir = null;
// 阶段区域路径
private Path stagingAreaDir = null;
// 作业历史路径
private Path jobHistoryDir = null;
// 日志
private static final Log LOG = LogFactory.getLog(Cluster.class);
// 客户端通信协议提供者加载器
private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
ServiceLoader.load(ClientProtocolProvider.class);
Cluster提供了两个构造函数,如下:
public Cluster(Configuration conf) throws IOException {最终会调用initialize()方法完成初始化,代码如下:
this(null, conf);
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
<span style="white-space:pre"></span>// 设置配置信息
this.conf = conf;
// 获取当前用户
this.ugi = UserGroupInformation.getCurrentUser();
// 调用initialize()方法完成初始化
initialize(jobTrackAddr, conf);
}
// 确定客户端ClientProtocol实例clientinitialize()方法唯一的一个任务就是确定客户端通信协议提供者clientProtocolProvider,并通过其create()方法构造客户端通信协议ClientProtocol实例client。
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
synchronized (frameworkLoader) {
// 取出每个ClientProtocolProvider实例provider,通过其create()方法,
// 构造ClientProtocol实例clientProtocol,
// 并将两者赋值给对类应成员变量,退出循环
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
// 通过ClientProtocolProvider的create()方法,获取客户端与集群通讯ClientProtocol实例clientProtocol
if (jobTrackAddr == null) {
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
// 设置类成员变量clientProtocolProvider、client,并退出循环
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
// 记录debug级别日志信息
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
else {
// 记录debug级别日志信息
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: " + e.getMessage());
}
}
}
// 如果clientProtocolProvider、client任一为空,直接抛出IO异常
if (null == clientProtocolProvider || null == client) {
throw new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}
MapReduce中,ClientProtocolProvider抽象类的实现共有YarnClientProtocolProvider、LocalClientProtocolProvider两种,前者为Yarn模式,而后者为Local模式。
我们先看下Yarn模式,看下YarnClientProtocolProvider的create()方法,代码如下:
@OverrideYarn模式下,如果参数mapreduce.framework.name配置的为yarn,构造一个YARNRunner实例并返回,否则返回null,关于YARNRunner,我们待会再讲,我们接着再看下Local模式,LocalClientProtocolProvider的create()方法,代码如下:
public ClientProtocol create(Configuration conf) throws IOException {
// 如果参数mapreduce.framework.name配置的为yarn,构造一个YARNRunner实例并返回,否则返回null
if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
return new YARNRunner(conf);
}
return null;
}
@OverrideLocal模式也是需要看参数mapreduce.framework.name的配置是否为local,是的话,返回LocalJobRunner实例,并设置map任务数量为1,否则返回null,值得一提的是,这里参数mapreduce.framework.name未配置的话,默认为local,也就是说,MapReduce需要看参数mapreduce.framework.name确定连接模式,但默认是Local模式的。
public ClientProtocol create(Configuration conf) throws IOException {
// 初始化framework:取参数mapreduce.framework.name,参数未配置默认为local
String framework =
conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
// 如果framework是local,,则返回LocalJobRunner实例,并设置map任务数量为1,否则返回null
if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
return null;
}
conf.setInt(JobContext.NUM_MAPS, 1);
return new LocalJobRunner(conf);
}
到了这里,我们就能够知道一个很重要的信息,Cluster中客户端通信协议ClientProtocol实例,要么是Yarn模式下的YARNRunner,要么就是Local模式下的LocalJobRunner,记住这点,对透彻了解MapReduce作业提交的整体流程非常重要。
好了,我们继续以Yarn模式来分析MapReduce集群连接,看下YARNRunner的实现,先看下它的成员变量,如下:
// 记录工厂RecordFactory实例
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
// ResourceManager代理ResourceMgrDelegate实例
private ResourceMgrDelegate resMgrDelegate;
// 客户端缓存ClientCache实例
private ClientCache clientCache;
// 配置信息Configuration实例
private Configuration conf;
// 文件上下文FileContext实例
private final FileContext defaultFileContext;
其中,最重要的一个变量就是ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息,其内部有一个YarnClient实例YarnClient,负责与Yarn进行通信,还有ApplicationId、ApplicationSubmissionContext等与特定应用程序相关的成员变量。关于ResourceMgrDelegate的详细介绍,请阅读《MapReduce源码分析ResourceMgrDelegate》一文,这里不再做详细介绍。
另外一个比较重要的变量就是客户端缓存ClientCache实例clientCache,
接下来,我们看下YARNRunner的构造函数,如下:
/**YARNRunner一共提供了三个构造函数,而我们之前说的WordCount作业提交时,其内部调用的是YARNRunner带有一个参数的构造函数,它会先构造ResourceManager代理ResourceMgrDelegate实例,然后再调用两个参数的构造函数,继而构造客户端缓存ClientCache实例,然后再调用三个参数的构造函数,而最终的构造函数只是进行简单的类成员变量赋值,然后通过FileContext的静态getFileContext()方法获取文件山下文FileContext实例defaultFileContext。
* Yarn runner incapsulates the client interface of
* yarn
* @param conf the configuration object for the client
*/
public YARNRunner(Configuration conf) {
// 先构造ResourceManager代理ResourceMgrDelegate实例,然后再调用两个参数的构造函数
this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
}
/**
* Similar to {@link #YARNRunner(Configuration)} but allowing injecting
* {@link ResourceMgrDelegate}. Enables mocking and testing.
* @param conf the configuration object for the client
* @param resMgrDelegate the resourcemanager client handle.
*/
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
// 先构造客户端缓存ClientCache实例,然后再调用三个参数的构造函数
this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
}
/**
* Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
* but allowing injecting {@link ClientCache}. Enable mocking and testing.
* @param conf the configuration object
* @param resMgrDelegate the resource manager delegate
* @param clientCache the client cache object.
*/
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
ClientCache clientCache) {
// 成员变量赋值
this.conf = conf;
try {
this.resMgrDelegate = resMgrDelegate;
this.clientCache = clientCache;
// 获取文件山下文FileContext实例defaultFileContext
this.defaultFileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException ufe) {
throw new RuntimeException("Error in instantiating YarnClient", ufe);
}
}
总结
MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster。Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法。在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol实例client,它由ClientProtocolProvider的静态create()方法构造,而Hadoop2.6.0中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster实际上是由它们负责与集群进行通信的,而Yarn模式下,ClientProtocol实例YARNRunner对象内部有一个ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息。