我们基于程序自带的例子来实现提交application 到YARN的ResourceManger。

Distributed Shell application:Client 步骤:

连接  ResourceManager;

通过ApplicationClientProtocol协议 与 ApplicationsManager交互提交AM,与此同时可以通过该协议过去集群的一些信息

* <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}.
* The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId}
* and application name, the priority assigned to the application and the queue
* to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
* also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which
* the {@link ApplicationMaster} is launched. </p>
* <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the
* {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available
* and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the
* {@link ApplicationMaster}. <p>
* <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the
* <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code>
* for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client
* kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>

public class Client { private static final Log LOG = LogFactory.getLog(Client.class); // Configuration
private Configuration conf;
private YarnClient yarnClient;
// Application master specific info to register a new Application with RM/ASM
private String appName = "";
// App master priority
private int amPriority = 0;
// Queue for App master
private String amQueue = "";
// Amt. of memory resource to request for to run the App Master
private int amMemory = 10; // Application master jar file
private String appMasterJar = "";
// Main class to invoke application master
private final String appMasterMainClass; // Shell command to be executed
private String shellCommand = "";
// Location of shell script
private String shellScriptPath = "";
// Args to be passed to the shell command
private String shellArgs = "";
// Env variables to be setup for the shell command
private Map<String, String> shellEnv = new HashMap<String, String>();
// Shell Command Container priority
private int shellCmdPriority = 0; // Amt of memory to request for container in which shell script will be executed
private int containerMemory = 10;
// No. of containers in which the shell script needs to be executed
private int numContainers = 1; // log4j.properties file
// if available, add to local resources and set into classpath
private String log4jPropFile = ""; // Start time for client
private final long clientStartTime = System.currentTimeMillis();
// Timeout threshold for client. Kill app after time interval expires.
private long clientTimeout = 600000; // Debug flag
boolean debugFlag = false; // Command line options
private Options opts; /**
* @param args Command line arguments
public static void main(String[] args) {
boolean result = false;
try {
Client client = new Client();
LOG.info("Initializing Client");
try {
boolean doRun = client.init(args);
if (!doRun) {
} catch (IllegalArgumentException e) {
result = client.run();
} catch (Throwable t) {
LOG.fatal("Error running CLient", t);
if (result) {
LOG.info("Application completed successfully");
LOG.error("Application failed to complete successfully");
} /**
public Client(Configuration conf) throws Exception {
} Client(String appMasterMainClass, Configuration conf) {
this.conf = conf;
this.appMasterMainClass = appMasterMainClass;
yarnClient = YarnClient.createYarnClient();
opts = new Options();
opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
opts.addOption("priority", true, "Application Priority. Default 0");
opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
opts.addOption("timeout", true, "Application timeout in milliseconds");
opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
opts.addOption("jar", true, "Jar file containing the application master");
opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
opts.addOption("shell_script", true, "Location of the shell script to be executed");
opts.addOption("shell_args", true, "Command line args for the shell script");
opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
opts.addOption("log_properties", true, "log4j.properties file");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("help", false, "Print usage"); } /**
public Client() throws Exception {
this(new YarnConfiguration());
} /**
* Helper function to print out usage
private void printUsage() {
new HelpFormatter().printHelp("Client", opts);
} /**
* Parse command line options
* @param args Parsed command line options
* @return Whether the init was successful to run the client
* @throws ParseException
public boolean init(String[] args) throws ParseException { CommandLine cliParser = new GnuParser().parse(opts, args); if (args.length == 0) {
throw new IllegalArgumentException("No args specified for client to initialize");
} if (cliParser.hasOption("help")) {
return false;
} if (cliParser.hasOption("debug")) {
debugFlag = true; } appName = cliParser.getOptionValue("appname", "DistributedShell");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");
amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10")); if (amMemory < 0) {
throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
+ " Specified memory=" + amMemory);
} if (!cliParser.hasOption("jar")) {
throw new IllegalArgumentException("No jar file specified for application master");
} appMasterJar = cliParser.getOptionValue("jar"); if (!cliParser.hasOption("shell_command")) {
throw new IllegalArgumentException("No shell command specified to be executed by application master");
shellCommand = cliParser.getOptionValue("shell_command"); if (cliParser.hasOption("shell_script")) {
shellScriptPath = cliParser.getOptionValue("shell_script");
if (cliParser.hasOption("shell_args")) {
shellArgs = cliParser.getOptionValue("shell_args");
if (cliParser.hasOption("shell_env")) {
String envs[] = cliParser.getOptionValues("shell_env");
for (String env : envs) {
env = env.trim();
int index = env.indexOf('=');
if (index == -1) {
shellEnv.put(env, "");
String key = env.substring(0, index);
String val = "";
if (index < (env.length()-1)) {
val = env.substring(index+1);
shellEnv.put(key, val);
shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0")); containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); if (containerMemory < 0 || numContainers < 1) {
throw new IllegalArgumentException("Invalid no. of containers or container memory specified, exiting."
+ " Specified containerMemory=" + containerMemory
+ ", numContainer=" + numContainers);
} clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); log4jPropFile = cliParser.getOptionValue("log_properties", ""); return true;
} /**
* Main run function for the client
* @return true if application completed successfully
* @throws IOException
* @throws YarnException
public boolean run() throws IOException, YarnException { LOG.info("Running Client");
yarnClient.start(); YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
LOG.info("Got Cluster metric info from ASM"
+ ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
LOG.info("Got Cluster node info from ASM");
for (NodeReport node : clusterNodeReports) {
LOG.info("Got node report from ASM for"
+ ", nodeId=" + node.getNodeId()
+ ", nodeAddress" + node.getHttpAddress()
+ ", nodeRackName" + node.getRackName()
+ ", nodeNumContainers" + node.getNumContainers());
} QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
LOG.info("Queue info"
+ ", queueName=" + queueInfo.getQueueName()
+ ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
+ ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
+ ", queueApplicationCount=" + queueInfo.getApplications().size()
+ ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
for (QueueUserACLInfo aclInfo : listAclInfo) {
for (QueueACL userAcl : aclInfo.getUserAcls()) {
LOG.info("User ACL Info for Queue"
+ ", queueName=" + aclInfo.getQueueName()
+ ", userAcl=" + userAcl.name());
} // Get a new application id
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
// TODO get min/max resource capabilities from RM and change memory ask if needed
// If we do not have min/max, we may not be able to correctly request
// the required resources from the RM for the app master
// Memory ask has to be a multiple of min and less than max.
// Dump out information about cluster capability as seen by the resource manager
int maxMem = appResponse.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capabililty of resources in this cluster " + maxMem); // A resource ask cannot exceed the max.
if (amMemory > maxMem) {
LOG.info("AM memory specified above max threshold of cluster. Using max value."
+ ", specified=" + amMemory
+ ", max=" + maxMem);
amMemory = maxMem;
} // set the application name
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
appContext.setApplicationName(appName); // Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); // set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); LOG.info("Copy App Master jar from local filesystem and add to local environment");
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
FileSystem fs = FileSystem.get(conf);
Path src = new Path(appMasterJar);
String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";
Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
fs.copyFromLocalFile(false, true, src, dst);
FileStatus destStatus = fs.getFileStatus(dst);
LocalResource amJarRsrc = Records.newRecord(LocalResource.class); // Set the type of resource - file or archive
// archives are untarred at destination
// we don't need the jar file to be untarred for now
// Set visibility of the resource
// Setting to most private option
// Set the resource to be copied over
// Set timestamp and length of file so that the framework
// can do basic sanity checks for the local resource
// after it has been copied over to ensure it is the same
// resource the client intended to use with the application
localResources.put("AppMaster.jar", amJarRsrc); // Set the log4j properties if needed
if (!log4jPropFile.isEmpty()) {
Path log4jSrc = new Path(log4jPropFile);
Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
localResources.put("log4j.properties", log4jRsrc);
} // The shell script has to be made available on the final container(s)
// where it will be executed.
// To do this, we need to first copy into the filesystem that is visible
// to the yarn framework.
// We do not need to set this as a local resource for the application
// master as the application master does not need it.
String hdfsShellScriptLocation = "";
long hdfsShellScriptLen = 0;
long hdfsShellScriptTimestamp = 0;
if (!shellScriptPath.isEmpty()) {
Path shellSrc = new Path(shellScriptPath);
String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh";
Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix);
fs.copyFromLocalFile(false, true, shellSrc, shellDst);
hdfsShellScriptLocation = shellDst.toUri().toString();
FileStatus shellFileStatus = fs.getFileStatus(shellDst);
hdfsShellScriptLen = shellFileStatus.getLen();
hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
} // Set local resource info into app master container launch context
amContainer.setLocalResources(localResources); // Set the necessary security tokens as needed
//amContainer.setContainerTokens(containerToken); // Set the env variables to be setup in the env where the application master will be run
LOG.info("Set the environment for the application master");
Map<String, String> env = new HashMap<String, String>(); // put location of shell script into env
// using the env info, the application master will create the correct local resource for the
// eventual containers that will be launched to execute the shell scripts
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); // Add AppMaster.jar location to classpath
// At some point we should not be required to add
// the hadoop specific classpaths to the env.
// It should be provided out of the box.
// For now setting all required classpaths including
// the classpath to "." for the application jar
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$())
for (String c : conf.getStrings(
classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties"); // add the runtime classpath needed for tests to work
if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
} env.put("CLASSPATH", classPathEnv.toString()); amContainer.setEnvironment(env); // Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30); // Set java executable command
LOG.info("Setting up app master command");
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// Set class name
// Set params for Application Master
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));
if (!shellCommand.isEmpty()) {
vargs.add("--shell_command " + shellCommand + "");
if (!shellArgs.isEmpty()) {
vargs.add("--shell_args " + shellArgs + "");
for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
if (debugFlag) {
} vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); // Get final commmand
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
} LOG.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();
amContainer.setCommands(commands); // Set up resource type requirements
// For now, only memory is supported so we set memory requirements
Resource capability = Records.newRecord(Resource.class);
appContext.setResource(capability); // Service data is a binary blob that can be passed to the application
// Not needed in this scenario
// amContainer.setServiceData(serviceData); // Setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
Credentials credentials = new Credentials();
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
if (tokenRenewer == null || tokenRenewer.length() == 0) {
throw new IOException(
"Can't get Master Kerberos principal for the RM to use as renewer");
} // For now, only getting tokens for the default file-system.
final Token<?> tokens[] =
fs.addDelegationTokens(tokenRenewer, credentials);
if (tokens != null) {
for (Token<?> token : tokens) {
LOG.info("Got dt for " + fs.getUri() + "; " + token);
DataOutputBuffer dob = new DataOutputBuffer();
ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
} appContext.setAMContainerSpec(amContainer); // Set the priority for the application master
Priority pri = Records.newRecord(Priority.class);
// TODO - what is the range for priority? how to decide?
appContext.setPriority(pri); // Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue); // Submit the application to the applications manager
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
// Ignore the response as either a valid response object is returned on success
// or an exception thrown to denote some form of a failure
LOG.info("Submitting application to ASM"); yarnClient.submitApplication(appContext); // TODO
// Try submitting the same request again
// app submission failure? // Monitor the application
return monitorApplication(appId); } /**
* Monitor the submitted application for completion.
* Kill application if time expires.
* @param appId Application Id of application to be monitored
* @return true if application completed successfully
* @throws YarnException
* @throws IOException
private boolean monitorApplication(ApplicationId appId)
throws YarnException, IOException { while (true) { // Check app status every 1 second.
try {
} catch (InterruptedException e) {
LOG.debug("Thread sleep in monitoring loop interrupted");
} // Get application report for the appId we are interested in
ApplicationReport report = yarnClient.getApplicationReport(appId); LOG.info("Got application report from ASM for"
+ ", appId=" + appId.getId()
+ ", clientToAMToken=" + report.getClientToAMToken()
+ ", appDiagnostics=" + report.getDiagnostics()
+ ", appMasterHost=" + report.getHost()
+ ", appQueue=" + report.getQueue()
+ ", appMasterRpcPort=" + report.getRpcPort()
+ ", appStartTime=" + report.getStartTime()
+ ", yarnAppState=" + report.getYarnApplicationState().toString()
+ ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
+ ", appTrackingUrl=" + report.getTrackingUrl()
+ ", appUser=" + report.getUser()); YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
if (YarnApplicationState.FINISHED == state) {
if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
LOG.info("Application has completed successfully. Breaking monitoring loop");
return true;
else {
LOG.info("Application did finished unsuccessfully."
+ " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
+ ". Breaking monitoring loop");
return false;
else if (YarnApplicationState.KILLED == state
|| YarnApplicationState.FAILED == state) {
LOG.info("Application did not finish."
+ " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
+ ". Breaking monitoring loop");
return false;
} if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
LOG.info("Reached client specified timeout for application. Killing application");
return false;
} } /**
* Kill a submitted application by sending a call to the ASM
* @param appId Application Id to be killed.
* @throws YarnException
* @throws IOException
private void forceKillApplication(ApplicationId appId)
throws YarnException, IOException {
// TODO clarify whether multiple jobs with the same app id can be submitted and be running at
// the same time.
// If yes, can we kill a particular attempt only? // Response can be ignored as it is non-null on success or
// throws an exception in case of failures
} }


*/ package org.apache.hadoop.yarn.applications.distributedshell; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records; /**
* An ApplicationMaster for executing shell commands on a set of launched
* containers using the YARN framework.
* <p>
* This class is meant to act as an example on how to write yarn-based
* application masters.
* </p>
* <p>
* The ApplicationMaster is started on a container by the
* <code>ResourceManager</code>'s launcher. The first thing that the
* <code>ApplicationMaster</code> needs to do is to connect and register itself
* with the <code>ResourceManager</code>. The registration sets up information
* within the <code>ResourceManager</code> regarding what host:port the
* ApplicationMaster is listening on to provide any form of functionality to a
* client as well as a tracking url that a client can use to keep track of
* status/job history if needed. However, in the distributedshell, trackingurl
* and appMasterHost:appMasterRpcPort are not supported.
* </p>
* <p>
* The <code>ApplicationMaster</code> needs to send a heartbeat to the
* <code>ResourceManager</code> at regular intervals to inform the
* <code>ResourceManager</code> that it is up and alive. The
* {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
* <code>ApplicationMaster</code> acts as a heartbeat.
* <p>
* For the actual handling of the job, the <code>ApplicationMaster</code> has to
* request the <code>ResourceManager</code> via {@link AllocateRequest} for the
* required no. of containers using {@link ResourceRequest} with the necessary
* resource specifications such as node location, computational
* (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
* responds with an {@link AllocateResponse} that informs the
* <code>ApplicationMaster</code> of the set of newly allocated containers,
* completed containers as well as current state of available resources.
* </p>
* <p>
* For each allocated container, the <code>ApplicationMaster</code> can then set
* up the necessary launch context via {@link ContainerLaunchContext} to specify
* the allocated container id, local resources required by the executable, the
* environment to be setup for the executable, commands to execute, etc. and
* submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
* launch and execute the defined commands on the given allocated container.
* </p>
* <p>
* The <code>ApplicationMaster</code> can monitor the launched container by
* either querying the <code>ResourceManager</code> using
* {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
* the {@link ContainerManagementProtocol} by querying for the status of the allocated
* container's {@link ContainerId}.
* <p>
* After the job has been completed, the <code>ApplicationMaster</code> has to
* send a {@link FinishApplicationMasterRequest} to the
* <code>ResourceManager</code> to inform it that the
* <code>ApplicationMaster</code> has been completed.
public class ApplicationMaster { private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); // Configuration
private Configuration conf; // Handle to communicate with the Resource Manager
private AMRMClientAsync amRMClient; // Handle to communicate with the Node Manager
private NMClientAsync nmClientAsync;
// Listen to process the response from the Node Manager
private NMCallbackHandler containerListener; // Application Attempt Id ( combination of attemptId and fail count )
private ApplicationAttemptId appAttemptID; // TODO
// For status update for clients - yet to be implemented
// Hostname of the container
private String appMasterHostname = "";
// Port on which the app master listens for status updates from clients
private int appMasterRpcPort = -1;
// Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = ""; // App Master configuration
// No. of containers to run shell command on
private int numTotalContainers = 1;
// Memory to request for the container on which the shell command will run
private int containerMemory = 10;
// Priority of the request
private int requestPriority; // Counter for completed containers ( complete denotes successful or failed )
private AtomicInteger numCompletedContainers = new AtomicInteger();
// Allocated container count so that we know how many containers has the RM
// allocated to us
private AtomicInteger numAllocatedContainers = new AtomicInteger();
// Count of failed containers
private AtomicInteger numFailedContainers = new AtomicInteger();
// Count of containers already requested from the RM
// Needed as once requested, we should not request for containers again.
// Only request for more if the original requirement changes.
private AtomicInteger numRequestedContainers = new AtomicInteger(); // Shell command to be executed
private String shellCommand = "";
// Args to be passed to the shell command
private String shellArgs = "";
// Env variables to be setup for the shell command
private Map<String, String> shellEnv = new HashMap<String, String>(); // Location of shell script ( obtained from info set in env )
// Shell script path in fs
private String shellScriptPath = "";
// Timestamp needed for creating a local resource
private long shellScriptPathTimestamp = 0;
// File length needed for local resource
private long shellScriptPathLen = 0; // Hardcoded path to shell script in launch container's local env
private final String ExecShellStringPath = "ExecShellScript.sh"; private volatile boolean done;
private volatile boolean success; private ByteBuffer allTokens; // Launch threads
private List<Thread> launchThreads = new ArrayList<Thread>(); /**
* @param args Command line args
public static void main(String[] args) {
boolean result = false;
try {
ApplicationMaster appMaster = new ApplicationMaster();
LOG.info("Initializing ApplicationMaster");
boolean doRun = appMaster.init(args);
if (!doRun) {
result = appMaster.run();
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
if (result) {
LOG.info("Application Master completed successfully. exiting");
} else {
LOG.info("Application Master failed. exiting");
} /**
* Dump out contents of $CWD and the environment to stdout for debugging
private void dumpOutDebugInfo() { LOG.info("Dump debug output");
Map<String, String> envs = System.getenv();
for (Map.Entry<String, String> env : envs.entrySet()) {
LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
System.out.println("System env: key=" + env.getKey() + ", val="
+ env.getValue());
} String cmd = "ls -al";
Runtime run = Runtime.getRuntime();
Process pr = null;
try {
pr = run.exec(cmd);
pr.waitFor(); BufferedReader buf = new BufferedReader(new InputStreamReader(
String line = "";
while ((line = buf.readLine()) != null) {
LOG.info("System CWD content: " + line);
System.out.println("System CWD content: " + line);
} catch (IOException e) {
} catch (InterruptedException e) {
} public ApplicationMaster() {
// Set up the configuration
conf = new YarnConfiguration();
} /**
* Parse command line options
* @param args Command line args
* @return Whether init successful and run should be invoked
* @throws ParseException
* @throws IOException
public boolean init(String[] args) throws ParseException, IOException { Options opts = new Options();
opts.addOption("app_attempt_id", true,
"App Attempt ID. Not to be used unless for testing purposes");
opts.addOption("shell_command", true,
"Shell command to be executed by the Application Master");
opts.addOption("shell_script", true,
"Location of the shell script to be executed");
opts.addOption("shell_args", true, "Command line args for the shell script");
opts.addOption("shell_env", true,
"Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("container_memory", true,
"Amount of memory in MB to be requested to run the shell command");
opts.addOption("num_containers", true,
"No. of containers on which the shell command needs to be executed");
opts.addOption("priority", true, "Application Priority. Default 0");
opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage");
CommandLine cliParser = new GnuParser().parse(opts, args); if (args.length == 0) {
throw new IllegalArgumentException(
"No args specified for application master to initialize");
} if (cliParser.hasOption("help")) {
return false;
} if (cliParser.hasOption("debug")) {
} Map<String, String> envs = System.getenv(); if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
if (cliParser.hasOption("app_attempt_id")) {
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
} else {
throw new IllegalArgumentException(
"Application Attempt Id not set in the environment");
} else {
ContainerId containerId = ConverterUtils.toContainerId(envs
appAttemptID = containerId.getApplicationAttemptId();
} if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
+ " not set in the environment");
if (!envs.containsKey(Environment.NM_HOST.name())) {
throw new RuntimeException(Environment.NM_HOST.name()
+ " not set in the environment");
if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
throw new RuntimeException(Environment.NM_HTTP_PORT
+ " not set in the environment");
if (!envs.containsKey(Environment.NM_PORT.name())) {
throw new RuntimeException(Environment.NM_PORT.name()
+ " not set in the environment");
} LOG.info("Application master for app" + ", appId="
+ appAttemptID.getApplicationId().getId() + ", clustertimestamp="
+ appAttemptID.getApplicationId().getClusterTimestamp()
+ ", attemptId=" + appAttemptID.getAttemptId()); if (!cliParser.hasOption("shell_command")) {
throw new IllegalArgumentException(
"No shell command specified to be executed by application master");
shellCommand = cliParser.getOptionValue("shell_command"); if (cliParser.hasOption("shell_args")) {
shellArgs = cliParser.getOptionValue("shell_args");
if (cliParser.hasOption("shell_env")) {
String shellEnvs[] = cliParser.getOptionValues("shell_env");
for (String env : shellEnvs) {
env = env.trim();
int index = env.indexOf('=');
if (index == -1) {
shellEnv.put(env, "");
String key = env.substring(0, index);
String val = "";
if (index < (env.length() - 1)) {
val = env.substring(index + 1);
shellEnv.put(key, val);
} if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
shellScriptPathTimestamp = Long.valueOf(envs
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
shellScriptPathLen = Long.valueOf(envs
} if (!shellScriptPath.isEmpty()
&& (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
LOG.error("Illegal values in env for shell script path" + ", path="
+ shellScriptPath + ", len=" + shellScriptPathLen + ", timestamp="
+ shellScriptPathTimestamp);
throw new IllegalArgumentException(
"Illegal values in env for shell script path");
} containerMemory = Integer.parseInt(cliParser.getOptionValue(
"container_memory", "10"));
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
"num_containers", "1"));
if (numTotalContainers == 0) {
throw new IllegalArgumentException(
"Cannot run distributed shell with no containers");
requestPriority = Integer.parseInt(cliParser
.getOptionValue("priority", "0")); return true;
} /**
* Helper function to print usage
* @param opts Parsed command line options
private void printUsage(Options opts) {
new HelpFormatter().printHelp("ApplicationMaster", opts);
} /**
* Main run function for the application master
* @throws YarnException
* @throws IOException
@SuppressWarnings({ "unchecked" })
public boolean run() throws YarnException, IOException {
LOG.info("Starting ApplicationMaster"); Credentials credentials =
DataOutputBuffer dob = new DataOutputBuffer();
// Now remove the AM->RM token so that containers cannot access it.
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<?> token = iter.next();
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.start(); containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.start(); // Setup local RPC Server to accept status requests directly from clients
// TODO need to setup a protocol for client to be able to communicate to
// the RPC server
// TODO use the rpc port info to register with the RM for the client to
// send requests to this app master // Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
// Dump out information about cluster capability as seen by the
// resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capabililty of resources in this cluster " + maxMem); // A resource ask cannot exceed the max.
if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max="
+ maxMem);
containerMemory = maxMem;
} // Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
for (int i = 0; i < numTotalContainers; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
numRequestedContainers.set(numTotalContainers); while (!done
&& (numCompletedContainers.get() != numTotalContainers)) {
try {
} catch (InterruptedException ex) {}
finish(); return success;
} @VisibleForTesting
NMCallbackHandler createNMCallbackHandler() {
return new NMCallbackHandler(this);
} private void finish() {
// Join all launched threads
// needed for when we time out
// and we need to release containers
for (Thread launchThread : launchThreads) {
try {
} catch (InterruptedException e) {
LOG.info("Exception thrown in thread join: " + e.getMessage());
} // When the application completes, it should stop all running containers
LOG.info("Application completed. Stopping running containers");
nmClientAsync.stop(); // When the application completes, it should send a finish application
// signal to the RM
LOG.info("Application completed. Signalling finish to RM"); FinalApplicationStatus appStatus;
String appMessage = null;
success = true;
if (numFailedContainers.get() == 0 &&
numCompletedContainers.get() == numTotalContainers) {
appStatus = FinalApplicationStatus.SUCCEEDED;
} else {
appStatus = FinalApplicationStatus.FAILED;
appMessage = "Diagnostics." + ", total=" + numTotalContainers
+ ", completed=" + numCompletedContainers.get() + ", allocated="
+ numAllocatedContainers.get() + ", failed="
+ numFailedContainers.get();
success = false;
try {
amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
LOG.error("Failed to unregister application", e);
} amRMClient.stop();
} private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
public void onContainersCompleted(List<ContainerStatus> completedContainers) {
LOG.info("Got response from RM for container ask, completedCnt="
+ completedContainers.size());
for (ContainerStatus containerStatus : completedContainers) {
LOG.info("Got container status for containerID="
+ containerStatus.getContainerId() + ", state="
+ containerStatus.getState() + ", exitStatus="
+ containerStatus.getExitStatus() + ", diagnostics="
+ containerStatus.getDiagnostics()); // non complete containers should not be here
assert (containerStatus.getState() == ContainerState.COMPLETE); // increment counters for completed/failed containers
int exitStatus = containerStatus.getExitStatus();
if (0 != exitStatus) {
// container failed
if (ContainerExitStatus.ABORTED != exitStatus) {
// shell script failed
// counts as completed
} else {
// container was killed by framework, possibly preempted
// we should re-try as the container was lost for some reason
// we do not need to release the container as it would be done
// by the RM
} else {
// nothing to do
// container completed successfully
LOG.info("Container completed successfully." + ", containerId="
+ containerStatus.getContainerId());
} // ask for more containers if any failed
int askCount = numTotalContainers - numRequestedContainers.get();
numRequestedContainers.addAndGet(askCount); if (askCount > 0) {
for (int i = 0; i < askCount; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
} if (numCompletedContainers.get() == numTotalContainers) {
done = true;
} @Override
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LOG.info("Launching shell command on a new container."
+ ", containerId=" + allocatedContainer.getId()
+ ", containerNode=" + allocatedContainer.getNodeId().getHost()
+ ":" + allocatedContainer.getNodeId().getPort()
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory());
// + ", containerToken"
// +allocatedContainer.getContainerToken().getIdentifier().toString()); LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer); // launch and start the container on a separate thread to keep
// the main thread unblocked
// as all containers may not be allocated at one go.
} @Override
public void onShutdownRequest() {
done = true;
} @Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {} @Override
public float getProgress() {
// set progress to deliver to RM on next heartbeat
float progress = (float) numCompletedContainers.get()
/ numTotalContainers;
return progress;
} @Override
public void onError(Throwable e) {
done = true;
} @VisibleForTesting
static class NMCallbackHandler
implements NMClientAsync.CallbackHandler { private ConcurrentMap<ContainerId, Container> containers =
new ConcurrentHashMap<ContainerId, Container>();
private final ApplicationMaster applicationMaster; public NMCallbackHandler(ApplicationMaster applicationMaster) {
this.applicationMaster = applicationMaster;
} public void addContainer(ContainerId containerId, Container container) {
containers.putIfAbsent(containerId, container);
} @Override
public void onContainerStopped(ContainerId containerId) {
if (LOG.isDebugEnabled()) {
LOG.debug("Succeeded to stop Container " + containerId);
} @Override
public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) {
if (LOG.isDebugEnabled()) {
LOG.debug("Container Status: id=" + containerId + ", status=" +
} @Override
public void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse) {
if (LOG.isDebugEnabled()) {
LOG.debug("Succeeded to start Container " + containerId);
Container container = containers.get(containerId);
if (container != null) {
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
} @Override
public void onStartContainerError(ContainerId containerId, Throwable t) {
LOG.error("Failed to start Container " + containerId);
} @Override
public void onGetContainerStatusError(
ContainerId containerId, Throwable t) {
LOG.error("Failed to query the status of Container " + containerId);
} @Override
public void onStopContainerError(ContainerId containerId, Throwable t) {
LOG.error("Failed to stop Container " + containerId);
} /**
* Thread to connect to the {@link ContainerManagementProtocol} and launch the container
* that will execute the shell command.
private class LaunchContainerRunnable implements Runnable { // Allocated container
Container container; NMCallbackHandler containerListener; /**
* @param lcontainer Allocated container
* @param containerListener Callback handler of the container
public LaunchContainerRunnable(
Container lcontainer, NMCallbackHandler containerListener) {
this.container = lcontainer;
this.containerListener = containerListener;
} @Override
* Connects to CM, sets up container launch context
* for shell command and eventually dispatches the container
* start request to the CM.
public void run() {
LOG.info("Setting up container launch container for containerid="
+ container.getId());
ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class); // Set the environment
ctx.setEnvironment(shellEnv); // Set the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); // The container for the eventual shell commands needs its own local
// resources too.
// In this scenario, if a shell script is specified, we need to have it
// copied and made available to the container.
if (!shellScriptPath.isEmpty()) {
LocalResource shellRsrc = Records.newRecord(LocalResource.class);
try {
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
} catch (URISyntaxException e) {
LOG.error("Error when trying to use shell script path specified"
+ " in env, path=" + shellScriptPath);
e.printStackTrace(); // A failure scenario on bad input such as invalid shell script path
// We know we cannot continue launching the container
// so we should release it.
localResources.put(ExecShellStringPath, shellRsrc);
ctx.setLocalResources(localResources); // Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5); // Set executable command
// Set shell script path
if (!shellScriptPath.isEmpty()) {
} // Set args for the shell command if any
// Add log redirect params
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); // Get final commmand
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
} List<String> commands = new ArrayList<String>();
ctx.setCommands(commands); // Set up tokens for the container too. Today, for normal shell commands,
// the container in distribute-shell doesn't need any tokens. We are
// populating them mainly for NodeManagers to be able to download any
// files in the distributed file-system. The tokens are otherwise also
// useful in cases, for e.g., when one is running a "hadoop dfs" command
// inside the distributed shell.
ctx.setTokens(allTokens.duplicate()); containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
} /**
* Setup the request that will be sent to the RM for the container ask.
* @return the setup ResourceRequest to be sent to RM
private ContainerRequest setupContainerAskForRM() {
// setup requirements for hosts
// using * as any host will do for the distributed shell app
// set the priority for the request
Priority pri = Records.newRecord(Priority.class);
// TODO - what is the range for priority? how to decide?
pri.setPriority(requestPriority); // Set up resource type requirements
// For now, only memory is supported so we set memory requirements
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(containerMemory); ContainerRequest request = new ContainerRequest(capability, null, null,
LOG.info("Requested container ask: " + request.toString());
return request;
