6.2 开发YARN的应用程序
YARN能引入其他的计算模式到Hadoop中。Hadoop 2.x、MapReduce、Pig和Hive都有AM的库和对应的客户端。开发人员可以使用YARN API编写自己的应用并运行在现有的Hadoop框架内。同样,企业如果已经有大量的数据集在HDFS中,也可以编写自定义的应用来使用它们,而不需要提供新的集群或者迁移已有的数据。
Storm是一个已经移植到YARN上的实时流处理引擎,带来了将数据向计算机节点移动的应用模式。Spark是另一个可以运行在YARN上的项目,并能利用已有的Hadoop框架来提供基于内存的数据转换(包括MapReduce)。还有很多正在开发的项目都展示了Hadoop有能力成为一个通用集群计算平台。
在本节中,我们将聚集于如何编写一个简单的YARN应用。这个应用从shell获取一个命令,然后在Hadoop集群指定数量的节点上执行。我们需要编写AM和客户端两个程序。
6.2.1 实现YARN客户端
YARN客户端通过ApplicationClientProtocol提交应用到RM,返回的结果是一个分配好的ApplicationId。客户端然后与AM通信,指定部署AM的容器的具体参数。AM是一个需要被独立启动和运行的程序。指定的参数包括AM的库的位置、任何AM运行时需要的环境变量和实际运行时需要的参数。
下面的代码段摘自shell command应用。我们写了一个run方法,它会被DistributedShellClient main方法调用。参数将会通过命令行传递到main方法。这些参数包括AM的JAR文件路径、shell命令行、执行命令需要的容器的数量。让我们看看如何通过如下的步骤编写一个YARN客户端。
DistributedShellClient.java
package MasteringYarn;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class DistributedShellClient {
private Configuration conf = new YarnConfiguration();
public void run(String[] args) throws YarnException, IOException, InterruptedException {
YarnConfiguration yarnConfiguration = new YarnConfiguration();
YarnClient yarnClient = YarnClient.createYarnClient();
//基于传入的配置信息,YarnClient对象获得了RM的信息,并在初始化中创建了一个RM的代理对象。所有于RM的通信是通过这个RM代理进行的
yarnClient.init(yarnConfiguration);
//客户端开始执行
yarnClient.start();
YarnClientApplication yarnClientApplication = yarnClient.createApplication();
//container launch context for application master
ContainerLaunchContext applicationMasterContainer = Records.newRecord(ContainerLaunchContext.class);
//设定启动容器时需要的命令行。这个例子中,用于启动AM的命令行在DistributedShellApplicationMaster类中。
applicationMasterContainer.setCommands(
Collections.singletonList("$JAVA_HOME/bin/java MasteringYarn.DistributedShellApplicationMaster " +
args[2] +
" " +
args[3] +
" " +
"1>" +
ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout " +
"2>" +
ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
);
LocalResource applicationMasterJar = Records.newRecord(LocalResource.class);
setupJarFileForApplicationMaster(new Path(args[1]), applicationMasterJar);
//指定一个包含AM运行逻辑的JAR文件。这个例子中,获取JAR文件的HDFS路径将设置为本地资源
applicationMasterContainer.setLocalResources(
Collections.singletonMap("MasteringYarn.jar", applicationMasterJar)
);
Map<String, String> appMasterEnv = new HashMap<>();
setupEnvironmentForApplicationMaster(appMasterEnv);
applicationMasterContainer.setEnvironment(appMasterEnv);
Resource resources = Records.newRecord(Resource.class);
resources.setVirtualCores(1);
resources.setMemory(100);
ApplicationSubmissionContext submissionContext = yarnClientApplication.getApplicationSubmissionContext();
submissionContext.setAMContainerSpec(applicationMasterContainer);
submissionContext.setQueue("default");
submissionContext.setApplicationName("MasteringYarn");
submissionContext.setResource(resources);
ApplicationId applicationId = submissionContext.getApplicationId();
System.out.println("Submitting " + applicationId);
yarnClient.submitApplication(submissionContext);
System.out.println("Post submission " + applicationId);
ApplicationReport applicationReport;
YarnApplicationState applicationState;
do{
Thread.sleep(1000);
applicationReport = yarnClient.getApplicationReport(applicationId);
applicationState = applicationReport.getYarnApplicationState();
System.out.println("Diagnostics " + applicationReport.getDiagnostics());
}while(applicationState != YarnApplicationState.FAILED &&
applicationState != YarnApplicationState.FINISHED &&
applicationState != YarnApplicationState.KILLED );
System.out.println("Application finished with " + applicationState + " state and id " + applicationId);
}
private void setupJarFileForApplicationMaster(Path jarPath, LocalResource localResource) throws IOException {
FileStatus jarStat = FileSystem.get(conf).getFileStatus(jarPath);
localResource.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
localResource.setSize(jarStat.getLen());
localResource.setTimestamp(jarStat.getModificationTime());
localResource.setType(LocalResourceType.FILE);
localResource.setVisibility(LocalResourceVisibility.PUBLIC);
}
private void setupEnvironmentForApplicationMaster(Map<String, String> environmentMap) {
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
Apps.addToEnvironment(environmentMap, ApplicationConstants.Environment.CLASSPATH.name(),
c.trim());
}
Apps.addToEnvironment(environmentMap,
ApplicationConstants.Environment.CLASSPATH.name(),
ApplicationConstants.Environment.PWD.$() + File.separator + "*");
}
public static void main(String[] args) throws Exception {
DistributedShellClient shellClient = new DistributedShellClient();
shellClient.run(args);
}
}
(1)第一步是创建一个YarnConfiguration对象。YarnConfiguration类是Hadoop MapReduce使用的Configuration类的子类。YarnConfiguraion对象创建成功后,该应用就可以读取一些必要的配置文件,例如yarn-site.xml文件。默认的属性被放置在yarn-default.xml文件中。yarn-site.xml文件一般可以在相对于Hadoop安装目录的etc/hadoop目录中找到。
(2)应用客户端现在需要初始化一个YarnClient对象,它是通过调用一个名为createYarnClient的工厂(factory)方法生成的。YarnClient对象使用之前创建的配置进行初始化。基于传入的配置信息,YarnClient对象获得了RM的信息,并在初始化中创建了一个RM的代理对象。所有与RM的通信是通过这个RM代理进行的。代理封装了ApplicationClientProtocol对象,然后调用YarnClient对象的start方法使得客户端开始执行。
(3)另一个可选的方法是开发者自己创建一个代理并自己管理它,比如ApplicationClientProtocol对象就是这样一种代理类型。不过我们还是推荐使用上一种方法。
(4)YarnClient有一个用于获取YarnClientApplication对象的createApplication方法。一旦YarnClient封装了RM的代理,它就包含了检索RM属性、管理提交到RM的应用的方法。
(5)一旦YarnClientApplication被创建,下一步就是申请一个容器来启动AM。YARN中的容器规格是通过ContainerLaunchContext类来描述的。在org.apache.hadoop.yarn.util包中,有一个特殊的Records.newRecord静态工厂方法用来初始化不同的类。
(6)通过浏览ContainerLaunchContext的文档,你可以了解到启动一个容器能够指定的属性。所有的ContainerLaunchContext对象中都可以使用ACL、命令行、环境变量、本地资源,服务的可执行数据和安全令牌(security token)。在以上代码中,ContainerLaunchContext对象被初始化,并调用setCommands来设定启动容器时需要的命令行。在我们的例子中,用于启动AM的命令行在DistributedShellApplicationMaster类中,稍后将定义它。
(7)启动AM要求必需的类或JAR文件在本地存在。下一步是使用ContainerLaunchContext对象的setLocalResources方法指定一个包含AM运行逻辑的JAR文件。在这个例子中,获取JAR文件的HDFS路径将设置为本地资源。这些文件的路径将以命令行参数的形式指定。另外,其他旁路通道也可以用来分派资源到容器的本地环境中。
(8)类似地,如果有任何容器运行需要的环境变量,可以通过使用ContainerLaunchContext对象的setEnvironment方法指定。
(9)不论启动哪种容器,RM所需要的最重要规格参数是容器对CPU和内存的要求。在这个例子中,执行AM的容器需要100MB的内存和1个CPU,这是通过Resource对象指定的。Resource对象是容器计算资源请求的抽象表现形式,当前有CPU和内存两种模型。CPU模型是以虚拟核(virtual core)为基本单位的,它是一个整数值,并且配置中必须将一个虚拟核映射到一个物理核。通常都是1:1映射。内存模型是以MB为单位。指定这两种模型分别使用Resource对象的setVirtualCores和setMemory方法。
(10)最后一步是提交应用到RM的应用管理器,提交的参数绑定在ApplicationSubmissionsContext对象中。YarnClientApplication类有这个对象的引用(reference)。ApplicationSubmissionsContext对象中有容器的规格参数、提交的队列、一个合适应用的名字和一个启动容器所需要的Resource对象。ApplicationSubmissionsContext对象也给出了ApplicationId,在管理应用的API中ApplicationId能用来代表这个应用。在上面的例子中,我们将为应用使用默认队列并设置一个合适的名字——MasteringYarn。最后,YarnClient对象提交应用。在实现内部,RM的代理用来将应用推送到RM。调度器开始介入,并在集群中调度应用的执行。AM是第一个产生的容器。
一旦提交应用结束,可以在YarnClient对象上通过getApplicationReport方法监控应用的进度。ApplicationReport对象包含了能够决定应用成败的有用信息,还包含了一个通用的字段来帮助开发人员在应用失败的情况下获取内部状态。
ApplicationReport对象通过getYarnApplicationState方法来获取应用的但前状态。在下面的代码中,我们每隔一秒钟就会轮询一次应用状态,看其是否终止。如果是KILLED、FINISHED或FAILED状态,应用就会终止。启用getDiagnostics可以打印故障发生时的诊断信息。
客户端还包含好几个有用的方法。第一个方法,setJarFileForApplicationMaster,通过合适的属性设置AM的JAR文件,其中大部分是自解释的。类似地,所有需要的环境变量是使用setEnvironmentForApplicationMaster方法指定的。这个方法也证明了使用YarnConfiguration读取配置文件yarn-site.xml。最后,主驱动方法实例化DistributedShellClient对象并调用它的run方法。
6.2.2 实现AM实例
AM是应用程序的指挥者(leader)。它封装了应用的所有逻辑,并在合适的时候从RM请求资源。与客户端不同的是,AM必须与下面的两种实例保持通信。
- RM:通信的内容是关于应用的全局状态,使用的协议是ApplicationMasterProtocol。
- NM:通信的内容是关于分配给应用的容器的状态,使用的协议是ContainerManager。
实现一个AM和实现一个客户端是类似的。我们从创建一个YarnConfiguration对象开始。为了方便与RM通信,创建一个AMRMClient,这个客户端封装了与RM通信所需要的代理对象。虽然推荐这种更简单的方式,但是再次说明一下,这个代理是可以独自创建的。AMRMClient有很多方法,其中最重要的方法是处理AM的注册方法(registerApplicationMaster)和请求分配容器方法(addContainerRequest)。
为了与NM通信,创建了一个封装了通信协议代理的NMClient对象,这个对象的重要方法包括startContainer和stopContainer,它们被用于启动和终止节点上的容器。
AMRMClient类用于将AM往RM注册。一旦注册成功,AM将启动一个心跳线程,周期性地通知RM它还活着。registerApplicationMaster方法也提供了一个AM正在监听的主机地址和端口号。客户端可以通过这个AM的主机地址和端口号获取应用的信息。
当启动我们的DistributedShell应用时,容器必须按照指定的参数分配好。其中两个必须设置的重要属性是容器的优先级和容器需要分配的资源的总量。
Priority类被实例化以设置容器的优先级。上面的例子中,我们使用的优先级为0。Priority对象只能用于特定的应用程序。
正如我们对AM所做的,我们通过Resource类设置每个容器对资源的要求。回顾一下,setMemory方法设置容器对内存的需求,单位是MB,而setVirtualCores设置对CPU核数的要求。
Priority和Resource对象是为AMRMClient而设计的,ContainerRequest对象会被部署到RM,从而进行资源分配工作。这个构建函数的第二和第三个参数是null,这意味着任意的节点和机架都可以分配给这个容器。这种情况对于MapReduce这样的应用是很有用的,因为它需要利用数据的局部性。这些参数的数据类型是String[],且第二个参数中列出的节点所对应的机架都将会自动地加入到机架列表中。
AMRMClient类中的allocate方法指示RM去分配容器,它同时也是发给RM的一个心跳信息。这个方法的返回结果是一个AllocateResponse对象,这个对象包含新分配的容器的信息、完整的容器列表以及集群相关的信息。它同时也表明在集群中这个应用剩余的可以使用的资源数量。
AllocateResponse也有一个ResponseId,用来无歧义地识别重复的请求。allocate方法的参数是一个浮点类型参数,它表明了这个操作的进度。AM可以通过这个参数展示这个应用的进度。
容器的启动总是要用到ContainerLaunchContext对象。它与启动AM容器的过程非常相似,通过指定命令行、环境变量、本地资源和其他程序需要的参数来执行容器。
然后,容器通过NMClient对象启动,它通知NM去启动容器并执行相关的命令行。startContainer方法接受RM返回的容器对象,这个对象中包含了容器的标识符、认证令牌以及容器所在的节点的信息。
一旦容器启动后,分配资源的调用即可用来监控这些容器。AllocateResponse对象上的getCompletedContainersStatuses方法能获得每个已完成容器的状态。当这些操作都完成后,AM将调用AMRMClient对象的unregisterApplicationMaster方法取消在RM上的注册。应用的状态可以被通知到RM,这样客户端或者其他程序就可以监控这个应用。
FinalApplicatioinStatus枚举类型有FAILED、KILLED、SUCCEEDED和UNDEFINED四个状态值。取消注册的操作也可以获得需要传递的诊断信息,以及客户可以用来获取AM终止相关信息的新的URL地址。
现在可以通过下面的命令执行应用了。date命令会在集群的两个容器中被运行:
hadoop fs -copyFromLocal MasteringYarn.jar
hadoop jar MasteringYarn.jar MasteringYarn.DistributedShellClient hdfs://localhost/usr/sandeepkaranth/MasteringYarn.jar date 2