如要转载,请注上作者和出处。 由于能力有限,如有错误,请大家指正。
须知: 我们下载的是hadoop-2.7.3-src 源码。 这个版本默认调度器是Capacity调度器。 在2.0.2-alpha版本的时候,有人汇报了一个fifo调度器的bug,社区把默认调度器从原来的fifo切换成capacity了。 参考
在Hadoop中,调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器,然后在配置文件中指定相应的调度器,这样,当Hadoop集群启动时,便会加载该调度器。当前Hadoop自带了几种调度器,分别是FIFO(默认调度器),Capacity Scheduler和FairScheduler,通常境况下,这些调度器很难满足公司复杂的应用需求,因而往往需要开发自己的调度器。本文介绍了Hadoop调度器的基本编写方法, 参考1
Hadoop1 调度框架:Hadoop的调度器是在JobTracker中加载和调用的,用户可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheduler属性中指定调度器。本节分析了Hadoop调度器的调度框架,实际上分析了两个重要类:TaskScheduler和JobTracker的关系。
Hadoop2 调度框架:Hadoop的调度器是在ResourceManager中加载和调用的,用户可以在配置文件yarn-site.xml中的yarn.resourcemanager.scheduler.class属性中指定调度器,默认是 org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler ; 还可以配置Fifo调度器,org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler ; 还可以配置Fair调度器, org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler 。 本节分析了Hadoop调度器的调度框架, 类比Hadoop1 , 三个调度器的共同扩展类是 AbstractYarnScheduler <T extends SchedulerApplicationAttempt, N extends SchedulerNode> , 它的功能类似Hadoop1的TaskScheduler ; 如果用户要编写自己的调度器,需要继承抽象类AbstractYarnScheduler。
MapReduce在Hadoop2中称为MR2或YARN,将JobTracker中的资源管理及任务生命周期管理(包括定时触发及监控),拆分成两个独立的服务,用于管理全部资源的ResourceManager以及管理每个应用的ApplicationMaster,ResourceManager用于管理向应用程序分配计算资源,每个ApplicationMaster用于管理应用程序、调度以及协调。一个应用程序可以是经典的MapReduce架构中的一个单独的任务,也可以是这些任务的一个DAG(有向无环图)任务。ResourceManager及每台机上的NodeManager服务,用于管理那台机的用户进程,形成计算架构。每个应用程序的ApplicationMaster实际上是一个框架具体库,并负责从ResourceManager中协调资源及与NodeManager(s)协作执行并监控任务。 参考2
针对Hadoop 1.0中的MapReduce在扩展性和多框架支持等方面的不足,它将JobTracker中的资源管理和作业控制功能分开,分别由组件ResourceManager和ApplicationMaster实现,其中,ResourceManager负责所有应用程序的资源分配,而ApplicationMaster仅负责管理一个应用程序,进而诞生了全新的通用资源管理框架YARN。基于YARN,用户可以运行各种类型的应用程序(不再像1.0那样仅局限于MapReduce一类应用),从离线计算的MapReduce到在线计算(流式处理)的Storm等。Hadoop 2.0对应Hadoop版本为Apache Hadoop 0.23.x、2.x和CDH4。
架构图:
其中ResourceManager包含两个主要的组件:定时调用器(Scheduler)以及应用管理器(ApplicationManager)。
定时调用器(Scheduler): 定时调度器负责向应用程序分配置资源,它不做监控以及应用程序的状 态跟踪,并且它不保证会重启由于应用程序本身或硬件出错而执行失败 的应用程序。
应用管理器(ApplicationManager): 应用程序管理器负责接收新任务,协调并提供在ApplicationMaster容 器失败时的重启功能。
节点管理器(NodeManager): NodeManager是ResourceManager在每台机器的上代理,负责容器的管 理,并监控他们的资源使用情况(cpu,内存,磁盘及网络等),以及向 ResourceManager/Scheduler提供这些资源使用报告。
应用总管(ApplicationMaster): 每个应用程序的ApplicationMaster负责从Scheduler申请资源,以及 跟踪这些资源的使用情况以及任务进度的监控。
1 调度器
我们先想分析调度器,首先要分析它的父类,以及父类的父类和实现接口,如 AbstractService, YarnScheduler, ResourceScheduler 以及 AbstractYarnScheduler, 如下所示:
AbstractService.java 在 hadoop-2.7.3-src/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/AbstractService.java
package org.apache.hadoop.service; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration; import com.google.common.annotations.VisibleForTesting; /**
* This is the base implementation class for services.
*/
//这是服务的基本实现类。
@Public
@Evolving
public abstract class AbstractService implements Service { private static final Log LOG = LogFactory.getLog(AbstractService.class); /**
* Service name.
*/
//服务名称
private final String name; /** service state */
//服务状态
private final ServiceStateModel stateModel; /**
* Service start time. Will be zero until the service is started.
*/
//服务开始时间。在服务开始之前为0。
private long startTime; /**
* The configuration. Will be null until the service is initialized.
*/
//配置。在服务初始化之前为null。
private volatile Configuration config; /**
* List of state change listeners; it is final to ensure
* that it will never be null.
*/
//状态更改侦听器列表;最终确保它不为null。
private final ServiceOperations.ServiceListeners listeners
= new ServiceOperations.ServiceListeners();
/**
* Static listeners to all events across all services
*/
//所有服务的所有事件的静态监听器
private static ServiceOperations.ServiceListeners globalListeners
= new ServiceOperations.ServiceListeners(); /**
* The cause of any failure -will be null.
* if a service did not stop due to a failure.
*/
//任何失败的原因 - 是因为null。 如果服务没有因为故障停止。
private Exception failureCause; /**
* the state in which the service was when it failed.
* Only valid when the service is stopped due to a failure
*/
//服务失败时的状态。仅当服务由于失败而停止时才有效。
private STATE failureState = null; /**
* object used to co-ordinate {@link #waitForServiceToStop(long)}
* across threads.
*/
//对象用于协调 {@link #waitForServiceToStop(long)} 跨线程。
private final AtomicBoolean terminationNotification =
new AtomicBoolean(false); /**
* History of lifecycle transitions
*/
//生命周期转换的历史
private final List<LifecycleEvent> lifecycleHistory
= new ArrayList<LifecycleEvent>(5); /**
* Map of blocking dependencies
*/
//阻止依赖关系的映射
private final Map<String,String> blockerMap = new HashMap<String, String>(); private final Object stateChangeLock = new Object(); /**
* Construct the service.
* @param name service name
*/
//构造服务
public AbstractService(String name) {
this.name = name;
stateModel = new ServiceStateModel(name);
} /*
* 获取当前的服务状态。
* 返回:服务的状态
*/
@Override
public final STATE getServiceState() {
return stateModel.getState();
} /*
* 获取服务失败时引发的第一个异常。 如果为空,则不记录任何异常
* 返回:在转换到停止状态期间日志记录的故障
*/
@Override
public final synchronized Throwable getFailureCause() {
return failureCause;
} /*
* 获取发生在{@link #getFailureCause()}中失败的状态。
* 返回:状态,如果没有失败,则为null
*/
@Override
public synchronized STATE getFailureState() {
return failureState;
} /**
* Set the configuration for this service.
* This method is called during {@link #init(Configuration)}
* and should only be needed if for some reason a service implementation
* needs to override that initial setting -for example replacing
* it with a new subclass of {@link Configuration}
* @param conf new configuration.
*/
/*
* 设置此服务的配置。当{@link #init(Configuration)}时该方法会被调用并且
* 只有在某些原因出现,服务实现需要覆盖该初始设置的情况下才需要这样做 - 例如
* 用{@link Configuration}的新子类替换它。
*/
protected void setConfig(Configuration conf) {
this.config = conf;
} /**
* {@inheritDoc}
* This invokes {@link #serviceInit}
* @param conf the configuration of the service. This must not be null
* @throws ServiceStateException if the configuration was null,
* the state change not permitted, or something else went wrong
*/
//这将调用{@link #serviceInit}
//子类的serviceInit会初始化所需服务,会创建相应的服务类然后加入服务列表
@Override
public void init(Configuration conf) {
//服务配置是否为空
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
//服务是否已经初始化
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) {
setConfig(conf);
try {
//服务初始化,会进入子类的同名函数
serviceInit(config);
if (isInState(STATE.INITED)) {
//if the service ended up here during init,
//notify the listeners
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
} /**
* {@inheritDoc}
* @throws ServiceStateException if the current service state does not permit
* this action
*/
//开始服务
@Override
public void start() {
if (isInState(STATE.STARTED)) {
return;
}
//enter the started state
synchronized (stateChangeLock) {
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
try {
startTime = System.currentTimeMillis();
serviceStart();
if (isInState(STATE.STARTED)) {
//if the service started (and isn't now in a later state), notify
if (LOG.isDebugEnabled()) {
LOG.debug("Service " + getName() + " is started");
}
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
} /**
* {@inheritDoc}
*/
//停止服务
@Override
public void stop() {
if (isInState(STATE.STOPPED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.STOPPED) != STATE.STOPPED) {
try {
serviceStop();
} catch (Exception e) {
//stop-time exceptions are logged if they are the first one,
noteFailure(e);
throw ServiceStateException.convert(e);
} finally {
//report that the service has terminated
terminationNotification.set(true);
synchronized (terminationNotification) {
terminationNotification.notifyAll();
}
//notify anything listening for events
notifyListeners();
}
} else {
//already stopped: note it
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring re-entrant call to stop()");
}
}
}
} /**
* Relay to {@link #stop()}
* @throws IOException
*/
@Override
public final void close() throws IOException {
stop();
} /**
* Failure handling: record the exception
* that triggered it -if there was not one already.
* Services are free to call this themselves.
* @param exception the exception
*/
/*
* 故障处理:记录触发它的异常 - 如果还没有一个。 服务可以*调用。
*/
protected final void noteFailure(Exception exception) {
if (LOG.isDebugEnabled()) {
LOG.debug("noteFailure " + exception, null);
}
if (exception == null) {
//make sure failure logic doesn't itself cause problems
return;
}
//record the failure details, and log it
//记录故障细节,并记录日志
synchronized (this) {
if (failureCause == null) {
failureCause = exception;
failureState = getServiceState();
LOG.info("Service " + getName()
+ " failed in state " + failureState
+ "; cause: " + exception,
exception);
}
}
} /*
* 阻止等待服务停止; 使用终止通知对象这样做。
* 该方法只有在执行所有服务停止操作(成功或失败)之后才返回,或超时已过
* 该方法可以在服务初始化或启动之前调用; 这是为了消除任何竞争条件,服务在此事件发生之前停止。
*/
@Override
public final boolean waitForServiceToStop(long timeout) {
boolean completed = terminationNotification.get();
while (!completed) {
try {
synchronized(terminationNotification) {
terminationNotification.wait(timeout);
}
// here there has been a timeout, the object has terminated,
// or there has been a spurious wakeup (which we ignore)
//这里有一个超时,对象已经终止了,或者有一个虚假的唤醒(我们忽略)
completed = true;
} catch (InterruptedException e) {
// interrupted; have another look at the flag
completed = terminationNotification.get();
}
}
return terminationNotification.get();
} /* ===================================================================== */
/* Override Points */
/* ===================================================================== */ /**
* All initialization code needed by a service.
*
* This method will only ever be called once during the lifecycle of
* a specific service instance.
*
* Implementations do not need to be synchronized as the logic
* in {@link #init(Configuration)} prevents re-entrancy.
*
* The base implementation checks to see if the subclass has created
* a new configuration instance, and if so, updates the base class value
* @param conf configuration
* @throws Exception on a failure -these will be caught,
* possibly wrapped, and wil; trigger a service stop
*/
/*
* 服务所需的所有初始化代码。
* 该方法只能在特定服务实例的生命周期中被调用一次。
* 实现不需要同步机制,因为{@link #init(Configuration))中的逻辑可以防止重新进入。
* 基本实现检查子类是否已创建新的配置实例,如果是,则更新基类值。
*/
protected void serviceInit(Configuration conf) throws Exception {
if (conf != config) {
LOG.debug("Config has been overridden during init");
setConfig(conf);
}
} /**
* Actions called during the INITED to STARTED transition.
*
* This method will only ever be called once during the lifecycle of
* a specific service instance.
*
* Implementations do not need to be synchronized as the logic
* in {@link #start()} prevents re-entrancy.
*
* @throws Exception if needed -these will be caught,
* wrapped, and trigger a service stop
*/
/*
* 在INITED到STARTED过渡期间所采取的行动。
* 该方法只能在特定服务实例的生命周期中被调用一次。
* 实现不需要同步机制,因为{@link #start()}中的逻辑可以防止重新进入。
*/
protected void serviceStart() throws Exception { } /**
* Actions called during the transition to the STOPPED state.
*
* This method will only ever be called once during the lifecycle of
* a specific service instance.
*
* Implementations do not need to be synchronized as the logic
* in {@link #stop()} prevents re-entrancy.
*
* Implementations MUST write this to be robust against failures, including
* checks for null references -and for the first failure to not stop other
* attempts to shut down parts of the service.
*
* @throws Exception if needed -these will be caught and logged.
*/
/*
* 在转换到STOPPED状态期间调用的动作。
* 该方法只能在特定服务实例的生命周期中被调用一次。
* 实现不需要同步机制,因为{@link #stop()}中的逻辑可以防止重入。
* 实现MUST写入这个要健壮来避免失败, 包括对空引用的检查,以及第一个不能停止其他尝试关闭部分服务的失败。
*/
protected void serviceStop() throws Exception { } /*
* 将监听器注册到服务状态更改事件。
* 如果提供的侦听器已经在监听此服务,则此方法是无效的。
* 参数 l 表示:一个新的监听器
*/
@Override
public void registerServiceListener(ServiceStateChangeListener l) {
listeners.add(l);
} /*
* 取消注册先前注册的服务状态更改事件的侦听器。 如果监听器已经被注销,则不用操作。
* 参数 l 表示:要注销的监听器
*/
@Override
public void unregisterServiceListener(ServiceStateChangeListener l) {
listeners.remove(l);
} /**
* Register a global listener, which receives notifications
* from the state change events of all services in the JVM
* @param l listener
*/
//注册一个全局监听器,它从JVM中所有服务的状态更改事件接收通知
public static void registerGlobalListener(ServiceStateChangeListener l) {
globalListeners.add(l);
} /**
* unregister a global listener.
* @param l listener to unregister
* @return true if the listener was found (and then deleted)
*/
//取消注册全局监听器。
public static boolean unregisterGlobalListener(ServiceStateChangeListener l) {
return globalListeners.remove(l);
} /**
* Package-scoped method for testing -resets the global listener list
*/
//用于测试的程序包范围的方法 - 重新设置全局侦听器列表
@VisibleForTesting
static void resetGlobalListeners() {
globalListeners.reset();
} /*
* 获取服务的名称。
* 返回:服务的名称
*/
@Override
public String getName() {
return name;
} /*
* 获取该服务的配置信息。
* 这通常不是一个克隆,并且可能被操纵,尽管不能保证这种行为的后果可能如何
* 返回:当前配置,除非具体实现选择。
*/
@Override
public synchronized Configuration getConfig() {
return config;
} /*
* 获取服务的开始时间。
* 返回:服务的开始时间。 如果服务尚未启动,则为零。
*/
@Override
public long getStartTime() {
return startTime;
} /**
* Notify local and global listeners of state changes.
* Exceptions raised by listeners are NOT passed up.
*/
//通知本地和全局监听器的状态变化。监听器提出的异常情况不会被传递。
private void notifyListeners() {
try {
listeners.notifyListeners(this);
globalListeners.notifyListeners(this);
} catch (Throwable e) {
LOG.warn("Exception while notifying listeners of " + this + ": " + e,
e);
}
} /**
* Add a state change event to the lifecycle history
*/
//将状态更改事件添加到生命周期历史记录
private void recordLifecycleEvent() {
LifecycleEvent event = new LifecycleEvent();
event.time = System.currentTimeMillis();
event.state = getServiceState();
lifecycleHistory.add(event);
} /*
* 获取生命周期历史的快照; 它是一个静态列表
* 返回:一个可能是empty的但从不是null的生命周期事件列表。
*/
@Override
public synchronized List<LifecycleEvent> getLifecycleHistory() {
return new ArrayList<LifecycleEvent>(lifecycleHistory);
} /**
* Enter a state; record this via {@link #recordLifecycleEvent}
* and log at the info level.
* @param newState the proposed new state
* @return the original state
* it wasn't already in that state, and the state model permits state re-entrancy.
*/
/*
* 输入状态; 记录这个通过{@link #recordLifecycleEvent}并以信息级别记录在日志。
* 参数 newState 表示 提出新的状态
* 返回:原来的状态还没有在这个状态,状态模式允许状态重新进入。
*/
private STATE enterState(STATE newState) {
assert stateModel != null : "null state in " + name + " " + this.getClass();
STATE oldState = stateModel.enterState(newState);
if (oldState != newState) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Service: " + getName() + " entered state " + getServiceState());
}
recordLifecycleEvent();
}
return oldState;
} /*
* 查询状态是否处于特定状态
* 参数 表示提出新的状态
*/
@Override
public final boolean isInState(Service.STATE expected) {
return stateModel.isInState(expected);
} @Override
public String toString() {
return "Service " + name + " in state " + stateModel;
} /**
* Put a blocker to the blocker map -replacing any
* with the same name.
* @param name blocker name
* @param details any specifics on the block. This must be non-null.
*/
/*
* 将拦截器放在拦截器map上 - 重新放置任何具有相同名称的。
* 参数 name 表示:拦截器名称
* 参数 details 表示:详细说明块上的细节。 这必须是非空。
*/
protected void putBlocker(String name, String details) {
synchronized (blockerMap) {
blockerMap.put(name, details);
}
} /**
* Remove a blocker from the blocker map -
* this is a no-op if the blocker is not present
* @param name the name of the blocker
*/
/*
* 从拦截器map中移除一个拦截器 - 如果拦截器不存在,这是空操作
* 参数 name 表示:拦截器的名称
*/
public void removeBlocker(String name) {
synchronized (blockerMap) {
blockerMap.remove(name);
}
} /*
* 获取一个服务的拦截器 - 远程依赖关系,使服务不再是<i>live</i>。
* 返回:一个拦截器名称-&gt的(快照)map;描述值
*/
@Override
public Map<String, String> getBlockers() {
synchronized (blockerMap) {
Map<String, String> map = new HashMap<String, String>(blockerMap);
return map;
}
}
}
YarnScheduler.java 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; /**
* This interface is used by the components to talk to the
* scheduler for allocating of resources, cleaning up resources.
*
*/
//该接口用于组件与调度器对话以分配资源、清理资源。
public interface YarnScheduler extends EventHandler<SchedulerEvent> { /**
* Get queue information
* @param queueName queue name
* @param includeChildQueues include child queues?
* @param recursive get children queues?
* @return queue information
* @throws IOException
*/
/*
获取队列信息
参数queueName 表示队列名称
参数includeChildQueues 表示是否包含子队列
参数recursive 表示递归得到孩子队列?
返回QueueInfo 队列信息
*/
@Public
@Stable
public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
boolean recursive) throws IOException; /**
* Get acls for queues for current user.
* @return acls for queues for current user
*/
/*
获取当前用户的队列的访问控制列表(acls)
*/
@Public
@Stable
public List<QueueUserACLInfo> getQueueUserAclInfo(); /**
* Get the whole resource capacity of the cluster.
* @return the whole resource capacity of the cluster.
*/
/*
获取集群的全部资源容量。
返回:集群的全部资源容量
*/
@LimitedPrivate("yarn")
@Unstable
public Resource getClusterResource(); /**
* Get minimum allocatable {@link Resource}.
* @return minimum allocatable resource
*/
/*
获取最小可分配资源。
返回:最小可分配资源。
*/
@Public
@Stable
public Resource getMinimumResourceCapability(); /**
* Get maximum allocatable {@link Resource} at the cluster level.
* @return maximum allocatable resource
*/
/*
获得最大的可分配的资源在集群级别。
返回:最大的可分配的资源
*/
@Public
@Stable
public Resource getMaximumResourceCapability(); /**
* Get maximum allocatable {@link Resource} for the queue specified.
* @param queueName queue name
* @return maximum allocatable resource
*/
/*
获取指定队列的最大可分配资源。
参数queueName 指队列名
返回:最大可分配资源
*/
@Public
@Stable
public Resource getMaximumResourceCapability(String queueName); @LimitedPrivate("yarn")
@Evolving
ResourceCalculator getResourceCalculator(); /**
* Get the number of nodes available in the cluster.
* @return the number of available nodes.
*/
/*
获取集群中可用节点的数目。
返回:可用节点的数目
*/
@Public
@Stable
public int getNumClusterNodes(); /**
* The main api between the ApplicationMaster and the Scheduler.
* The ApplicationMaster is updating his future resource requirements
* and may release containers he doens't need.
*
* @param appAttemptId
* @param ask
* @param release
* @param blacklistAdditions
* @param blacklistRemovals
* @return the {@link Allocation} for the application
*/
/*
ApplicationMaster 和 Scheduler 之间的主要接口。ApplicationMaster 正在更新它的将来的资源需求以及可能释放它不需要的 containers 。
返回:应用程序的 {@link Allocation}
*/
@Public
@Stable
Allocation
allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask,
List<ContainerId> release,
List<String> blacklistAdditions,
List<String> blacklistRemovals); /**
* Get node resource usage report.
* @param nodeId
* @return the {@link SchedulerNodeReport} for the node or null
* if nodeId does not point to a defined node.
*/
/*
获取节点资源使用报告。
返回:节点的 {@link SchedulerNodeReport} ;或者null,当nodeId没有指向一个已经定义的节点。
*/
@LimitedPrivate("yarn")
@Stable
public SchedulerNodeReport getNodeReport(NodeId nodeId); /**
* Get the Scheduler app for a given app attempt Id.
* @param appAttemptId the id of the application attempt
* @return SchedulerApp for this given attempt.
*/
/*
获取调度器应用程序,通过一个应用程序的尝试Id。
参数appAttemptId 应用程序尝试的id
返回:对这个给定的尝试返回 SchedulerApp
*/
@LimitedPrivate("yarn")
@Stable
SchedulerAppReport getSchedulerAppInfo(ApplicationAttemptId appAttemptId); /**
* Get a resource usage report from a given app attempt ID.
* @param appAttemptId the id of the application attempt
* @return resource usage report for this given attempt
*/
/*
从给定的应用程序尝试ID获取资源使用报告。
参数appAttemptId表示应用程序尝试的id
返回:给定的尝试的资源使用报告
*/
@LimitedPrivate("yarn")
@Evolving
ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId); /**
* Get the root queue for the scheduler.
* @return the root queue for the scheduler.
*/
/*
获取调度器的根队列。
返回:度器的根队列
*/
@LimitedPrivate("yarn")
@Evolving
QueueMetrics getRootQueueMetrics(); /**
* Check if the user has permission to perform the operation.
* If the user has {@link QueueACL#ADMINISTER_QUEUE} permission,
* this user can view/modify the applications in this queue
* @param callerUGI
* @param acl
* @param queueName
* @return <code>true</code> if the user has the permission,
* <code>false</code> otherwise
*/
/*
检查用户是否具有执行操作的权限。如果用户有{@link QueueACL#ADMINISTER_QUEUE}这样的权限,这个用户就可以查看和修改这个队列里的应用程序。
返回:<code>true</code>表示用户有这样的权限, 其它返回 <code>false</code>
*/
boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName); /**
* Gets the apps under a given queue
* @param queueName the name of the queue.
* @return a collection of app attempt ids in the given queue.
*/
/*
获取给定队列下的应用程序。
参数 queueName指队列的名称
返回:给定队列的应用程序尝试的id的集合
*/
@LimitedPrivate("yarn")
@Stable
public List<ApplicationAttemptId> getAppsInQueue(String queueName); /**
* Get the container for the given containerId.
* @param containerId
* @return the container for the given containerId.
*/
/*
获得给定containerId的容器。
返回:给定containerId的容器
*/
@LimitedPrivate("yarn")
@Unstable
public RMContainer getRMContainer(ContainerId containerId); /**
* Moves the given application to the given queue
* @param appId
* @param newQueue
* @return the name of the queue the application was placed into
* @throws YarnException if the move cannot be carried out
*/
/*
将给定的应用程序移动到给定的队列。
返回:返回应用程序被放置的队列的名称
抛出YarnException异常,当移动不能进行的时候
*/
@LimitedPrivate("yarn")
@Evolving
public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException; /**
* Completely drain sourceQueue of applications, by moving all of them to
* destQueue.
*
* @param sourceQueue
* @param destQueue
* @throws YarnException
*/
/*
应用程序完全用完sourceQueue, 通过把它们都移动到destQueue
*/
void moveAllApps(String sourceQueue, String destQueue) throws YarnException; /**
* Terminate all applications in the specified queue.
*
* @param queueName the name of queue to be drained
* @throws YarnException
*/
/*
终止指定队列中的所有应用程序。
参数queueName指资源被用完的队列名称
*/
void killAllAppsInQueue(String queueName) throws YarnException; /**
* Remove an existing queue. Implementations might limit when a queue could be
* removed (e.g., must have zero entitlement, and no applications running, or
* must be a leaf, etc..).
*
* @param queueName name of the queue to remove
* @throws YarnException
*/
/*
删除现有队列。当队列可以被移除时,实现可能会受到限制 (例如,必须有零个授权,并且没有应用程序运行,或必须是叶子,等。)
参数 queueName指要删除的队列名
*/
void removeQueue(String queueName) throws YarnException; /**
* Add to the scheduler a new Queue. Implementations might limit what type of
* queues can be dynamically added (e.g., Queue must be a leaf, must be
* attached to existing parent, must have zero entitlement).
*
* @param newQueue the queue being added.
* @throws YarnException
*/
/*
给调度器添加一个新队列。实现可能会限制哪种类型的队列能够动态添加(例如,队列必须是一个叶子,必须依附于现有的父级,必须有零的授权)
*/
void addQueue(Queue newQueue) throws YarnException; /**
* This method increase the entitlement for current queue (must respect
* invariants, e.g., no overcommit of parents, non negative, etc.).
* Entitlement is a general term for weights in FairScheduler, capacity for
* the CapacityScheduler, etc.
*
* @param queue the queue for which we change entitlement
* @param entitlement the new entitlement for the queue (capacity,
* maxCapacity, etc..)
* @throws YarnException
*/
/*
此方法增加当前队列的权限(必须遵守不变量,例如,没有过度使用的双亲,非负,等等。)。
*/
void setEntitlement(String queue, QueueEntitlement entitlement)
throws YarnException; /**
* Gets the list of names for queues managed by the Reservation System
* @return the list of queues which support reservations
*/
/*
获取由预订系统管理的队列的名称列表。
返回:支持预定的队列列表
*/
public Set<String> getPlanQueues() throws YarnException; /**
* Return a collection of the resource types that are considered when
* scheduling
*
* @return an EnumSet containing the resource types
*/
/*
返回调度时所考虑的资源类型的集合
返回:返回一个EnumSet包含资源类型
*/
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes();
}
ResourceScheduler.java 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; /**
* This interface is the one implemented by the schedulers. It mainly extends
* {@link YarnScheduler}.
*
*/
//这个接口是由调度器实现的。它主要扩展{@link YarnScheduler}.
@LimitedPrivate("yarn")
@Evolving
public interface ResourceScheduler extends YarnScheduler, Recoverable { /**
* Set RMContext for <code>ResourceScheduler</code>.
* This method should be called immediately after instantiating
* a scheduler once.
* @param rmContext created by ResourceManager
*/
/*
* 为<code>ResourceScheduler</code>设置RMContext。
* 一旦实例化一个scheduler, 该方法应该立刻被调用。
* 参数: rmContext 被 ResourceManager 创建
*/
void setRMContext(RMContext rmContext); /**
* Re-initialize the <code>ResourceScheduler</code>.
* @param conf configuration
* @throws IOException
*/
/*
* 重新初始化<code>ResourceScheduler</code>.
* 参数conf表示配置
*/
void reinitialize(Configuration conf, RMContext rmContext) throws IOException;
}
AbstractYarnScheduler.java 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.util.concurrent.SettableFuture; @SuppressWarnings("unchecked")
@Private
@Unstable
public abstract class AbstractYarnScheduler
<T extends SchedulerApplicationAttempt, N extends SchedulerNode>
extends AbstractService implements ResourceScheduler { private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class); // Nodes in the cluster, indexed by NodeId
// 在集群中的节点,用NodeId索引
protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>(); // Whole capacity of the cluster
// 集群全部容量
protected Resource clusterResource = Resource.newInstance(, ); protected Resource minimumAllocation;
private Resource maximumAllocation;
private Resource configuredMaximumAllocation;
private int maxNodeMemory = -;
private int maxNodeVCores = -;
private final ReadLock maxAllocReadLock;
private final WriteLock maxAllocWriteLock; private boolean useConfiguredMaximumAllocationOnly = true;
private long configuredMaximumAllocationWaitTime; protected RMContext rmContext; /*
* All schedulers which are inheriting AbstractYarnScheduler should use
* concurrent version of 'applications' map.
*/
// 所有继承自AbstractYarnScheduler的调度器应该使用并行版本的'applications' map
protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
protected int nmExpireInterval; protected final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(), null, null, null); /**
* Construct the service.
*
* @param name service name
*/
/*
* 构造服务
* 参数name表示服务名
*/
public AbstractYarnScheduler(String name) {
super(name);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.maxAllocReadLock = lock.readLock();
this.maxAllocWriteLock = lock.writeLock();
} // 服务所需的所有初始化代码。
@Override
public void serviceInit(Configuration conf) throws Exception {
// getInt()表示获取<code> name </code>属性的值作为<code> int </code>。
// 如果没有这样的属性,返回提供的默认值,或者如果指定的值不是有效的<code> int </ code>,那么会抛出一个错误。
// 第一个参数是String name,第二个参数int defaultValue
// DEFAULT_RM_NM_EXPIRY_INTERVAL_MS指节点管理器被认为死所要的等待的时间,默认为600000ms。
nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
// 获取<code> name </code>属性的值作为<code> long </code>。
// 如果没有这样的属性,返回所提供的默认值,或者如果指定的值不是有效的<code> long </ code>,则会抛出错误。
// 第一个参数是String name,第二个参数long defaultValue
// DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS指,默认为10000ms。
configuredMaximumAllocationWaitTime =
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
//
createReleaseCache();
super.serviceInit(conf);
} public List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
// 从<code>ApplicationAttempId</code>中获取<code>ApplicationId</code>
ApplicationId appId = currentAttempt.getApplicationId();
// 调用的get()函数是Map类,返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
SchedulerApplication<T> app = applications.get(appId);
// 构造一个初始容量为十的空列表。它只接收Container类型
List<Container> containerList = new ArrayList<Container>();
// rmContext是接口RMContext的对象,而该接口只有一个实现类RMContextImpl,
// rmContext.getRMApps()返回ConcurrentMap<ApplicationId, RMApp>
// rmContext.getRMApps().get(appId)调用的是Map类的get()函数。
RMApp appImpl = this.rmContext.getRMApps().get(appId);
// appImpl是接口RMApp对象,
// appImpl.getApplicationSubmissionContext()此{@link RMApp}的应用程序提交上下文,返回ApplicationSubmissionContext
// getUnmanagedAM()获取是否RM应该管理AM的执行。
if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
return containerList;
}
if (app == null) {
return containerList;
}
// getLiveContainers()获取应用程序的活动容器并返回。
Collection<RMContainer> liveContainers =
app.getCurrentAppAttempt().getLiveContainers();
// getMasterContainer()是供Application Master运行的容器,
// Container类的getId()获取容器的全局唯一标识符。
// 最后获取的是Application Master的容器Id
ContainerId amContainerId =
rmContext.getRMApps().get(appId).getCurrentAppAttempt()
.getMasterContainer().getId();
for (RMContainer rmContainer : liveContainers) {
// 判断当前的Id是否是Application Master的容器Id
if (!rmContainer.getContainerId().equals(amContainerId)) {
// 不相等,则往容器列表中添加容器
containerList.add(rmContainer.getContainer());
}
}
return containerList;
} public Map<ApplicationId, SchedulerApplication<T>>
getSchedulerApplications() {
return applications;
} // 获取集群的整个资源容量。
@Override
public Resource getClusterResource() {
return clusterResource;
} // 获取最小可分配{@link Resource}。
@Override
public Resource getMinimumResourceCapability() {
return minimumAllocation;
} // 在集群级别获取最大可分配{@link Resource}。
@Override
public Resource getMaximumResourceCapability() {
Resource maxResource;
maxAllocReadLock.lock();
try {
// 类最开始定义useConfiguredMaximumAllocationOnly为true
if (useConfiguredMaximumAllocationOnly) {
// System.currentTimeMillis()产生一个当前的毫秒,这个毫秒其实就是自1970年1月1日0时起的毫秒数
// ResourceManager.getClusterTimeStamp()调用的也是System.currentTimeMillis(),
// configuredMaximumAllocationWaitTime默认值为10000ms
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
> configuredMaximumAllocationWaitTime) {
useConfiguredMaximumAllocationOnly = false; //设为false
}
// 克隆一份资源
maxResource = Resources.clone(configuredMaximumAllocation);
} else {
maxResource = Resources.clone(maximumAllocation);
}
} finally {
maxAllocReadLock.unlock();
}
return maxResource;
} //
@Override
public Resource getMaximumResourceCapability(String queueName) {
return getMaximumResourceCapability();
} // 初始化最大资源容量
protected void initMaximumResourceCapability(Resource maximumAllocation) {
maxAllocWriteLock.lock();
try {
if (this.configuredMaximumAllocation == null) {
// 克隆资源
this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
this.maximumAllocation = Resources.clone(maximumAllocation);
}
} finally {
maxAllocWriteLock.unlock();
}
} //
protected synchronized void containerLaunchedOnNode(
ContainerId containerId, SchedulerNode node) {
// Get the application for the finished container
// 获取完成了的容器的应用程序
SchedulerApplicationAttempt application = getCurrentAttemptForContainer
(containerId);
if (application == null) {
// getApplicationAttemptId()获取分配了<code> Container </code>的应用程序的<code> ApplicationAttemptId </code>。
// getApplicationId() 获取<code> ApplicationAttempId </code>的<code> ApplicationId </code>。
LOG.info("Unknown application "
+ containerId.getApplicationAttemptId().getApplicationId()
+ " launched container " + containerId + " on node: " + node);
// rmContext是接口RMContext的对象, rmContext.getDispatcher()返回接口Dispatcher的对象,
// rmContext.getDispatcher().getEventHandler()返回接口EventHandler对象, 最后调用EventHandler的handle()方法
// RMNodeCleanContainerEvent表示资源管理器节点清除容器事件,构造函数内部有RMNodeEventType.CLEANUP_CONTAINER
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
} application.containerLaunchedOnNode(containerId, node.getNodeID());
} //
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
// getApplicationId() 获取<code> ApplicationAttempId </code>的<code> ApplicationId </code>。
// 调用的get()函数是Map类,返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
SchedulerApplication<T> app =
applications.get(applicationAttemptId.getApplicationId());
// getCurrentAppAttempt()返回的是SchedulerApplicationAttempt类对象
return app == null ? null : app.getCurrentAppAttempt();
} // 从给定应用程序尝试Id中获取调度器应用程序
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
if (attempt == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
}
return null;
}
// SchedulerAppReport类 表示应用程序尝试,以及尝试使用的资源。
return new SchedulerAppReport(attempt);
} // 从给定的应用程序尝试ID获取资源使用情况报告。
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
if (attempt == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
}
return null;
}
//
return attempt.getResourceUsageReport();
} // 根据容器Id获取当前应用程序的尝试
public T getCurrentAttemptForContainer(ContainerId containerId) {
return getApplicationAttempt(containerId.getApplicationAttemptId());
} // 获取给定containerId的容器。
@Override
public RMContainer getRMContainer(ContainerId containerId) {
SchedulerApplicationAttempt attempt =
getCurrentAttemptForContainer(containerId);
// getRMContainer()方法表示获取资源管理器容器
return (attempt == null) ? null : attempt.getRMContainer(containerId);
} // 获取节点资源使用情况报告。
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
// Map类方法get()
N node = nodes.get(nodeId);
// SchedulerNodeReport类表示节点使用报告
return node == null ? null : new SchedulerNodeReport(node);
} // 将给定的应用程序移动到给定的队列
@Override
public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support moving apps between queues");
} // 移除一个已有的队列
public void removeQueue(String queueName) throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support removing queues");
} // 把一个新队列添加到调度器。
@Override
public void addQueue(Queue newQueue) throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support this operation");
} // 此方法增加了当前队列的权限
@Override
public void setEntitlement(String queue, QueueEntitlement entitlement)
throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support this operation");
} //在节点上杀死孤立容器
private void killOrphanContainerOnNode(RMNode node,
NMContainerStatus container) {
// getContainerState()获取容器的状态
// Enum类的equals()函数表示 如果指定的对象等于此枚举常量,则返回true。否则false。
// ContainerState类表示容器的状态,有三种NEW, RUNNING, COMPLETE。COMPLETE表示完成的容器。
if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
// 在本类的containerLaunchedOnNode()函数中有一样的,略
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeCleanContainerEvent(node.getNodeID(),
container.getContainerId()));
}
} // 在节点上恢复容器
public synchronized void recoverContainersOnNode(
List<NMContainerStatus> containerReports, RMNode nm) {
if (!rmContext.isWorkPreservingRecoveryEnabled()
|| containerReports == null
|| (containerReports != null && containerReports.isEmpty())) {
return;
} for (NMContainerStatus container : containerReports) {
/*
* container.getContainerId()获取容器的<code> ContainerId </code>。
* getApplicationAttemptId() 获取分配了<code> Container </code>的应用程序的<code> ApplicationAttemptId </code>。
* getApplicationId() 获取<code> ApplicationAttempId </ code>的<code> ApplicationId </code>。
*/
ApplicationId appId =
container.getContainerId().getApplicationAttemptId().getApplicationId();
//
RMApp rmApp = rmContext.getRMApps().get(appId);
if (rmApp == null) {
LOG.error("Skip recovering container " + container
+ " for unknown application.");
killOrphanContainerOnNode(nm, container);
continue;
} // Unmanaged AM recovery is addressed in YARN-1815
// 未经管理的AM恢复在YARN-1815中得到解决
// rmApp.getApplicationSubmissionContext()函数表示{@link RMApp}的应用程序提交上下文
// getUnmanagedAM()获取是否RM应该管理AM的执行。如果为真,则RM不会为AM分配容器并启动它。
if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
LOG.info("Skip recovering container " + container + " for unmanaged AM."
+ rmApp.getApplicationId());
killOrphanContainerOnNode(nm, container);
continue;
} //Map类的get()函数
SchedulerApplication<T> schedulerApp = applications.get(appId);
if (schedulerApp == null) {
//rmApp.getState()表示{@link RMApp}的当前状态。
LOG.info("Skip recovering container " + container
+ " for unknown SchedulerApplication. Application current state is "
+ rmApp.getState());
killOrphanContainerOnNode(nm, container);
continue;
} LOG.info("Recovering container " + container);
SchedulerApplicationAttempt schedulerAttempt =
schedulerApp.getCurrentAppAttempt(); // getKeepContainersAcrossApplicationAttempts()函数 获取指示是否在应用程序尝试中保留容器的标志
if (!rmApp.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
// Do not recover containers for stopped attempt or previous attempt.
// 不要因为停止了的尝试或以前的尝试恢复容器。
if (schedulerAttempt.isStopped()
|| !schedulerAttempt.getApplicationAttemptId().equals(
container.getContainerId().getApplicationAttemptId())) {
LOG.info("Skip recovering container " + container
+ " for already stopped attempt.");
killOrphanContainerOnNode(nm, container);
continue;
}
} // create container
// 创建容器
RMContainer rmContainer = recoverAndCreateContainer(container, nm); // recover RMContainer
// 恢复 RMContainer
rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
container)); // recover scheduler node
// 恢复调度器节点
nodes.get(nm.getNodeID()).recoverContainer(rmContainer); // recover queue: update headroom etc.
// 恢复队列:更新净空等等
Queue queue = schedulerAttempt.getQueue();
queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer); // recover scheduler attempt
// 恢复调度器尝试
schedulerAttempt.recoverContainer(rmContainer); // set master container for the current running AMContainer for this
// attempt.
// 为这个尝试 为当前运行的AMContainer设置主容器
RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
if (appAttempt != null) {
// getMasterContainer()函数表示 ApplicationMaster运行在其上的容器
Container masterContainer = appAttempt.getMasterContainer(); // Mark current running AMContainer's RMContainer based on the master
// container ID stored in AppAttempt.
// 根据存储在AppAttempt中的主容器ID,标记当前正在运行的AMContainer的RMContainer。
if (masterContainer != null
&& masterContainer.getId().equals(rmContainer.getContainerId())) {
// 设置ApplicationMaster容器
((RMContainerImpl)rmContainer).setAMContainer(true);
}
} synchronized (schedulerAttempt) {
// 这个pendingRelease用于工作维护恢复方案,以跟踪AM的未完成发布请求。
// RM恢复可以收到AM的发布请求表,在此之前从NM收到容器状态以进行恢复。
// 在这种情况下,由NM报告的待回收容器不应该被收回。
Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
// Set类中的contains()函数,
//如果此集合包含指定的元素,则返回<tt> true </ tt>。 更正式地,当且仅当该集合包含元素<tt> e </ tt>时,返回<tt> true </ tt>,
//这样<tt>(o==null ? e==null : o.equals(e))</tt>.
if (releases.contains(container.getContainerId())) {
// release the container
//释放容器
rmContainer.handle(new RMContainerFinishedEvent(container
.getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED));
releases.remove(container.getContainerId());
LOG.info(container.getContainerId() + " is released by application.");
}
}
}
} // 恢复并创建容器
// NMContainerStatus包括容器的当前信息。
// RMNode类表示节点管理器有关可用资源和其他静态信息的信息。
private RMContainer recoverAndCreateContainer(NMContainerStatus status,
RMNode node) {
// 创建Container实例
Container container =
Container.newInstance(status.getContainerId(), node.getNodeID(),
node.getHttpAddress(), status.getAllocatedResource(),
status.getPriority(), null);
// 获取应用程序的尝试Id
ApplicationAttemptId attemptId =
container.getId().getApplicationAttemptId();
// 创建一个RMContainerImpl对象
RMContainer rmContainer =
new RMContainerImpl(container, attemptId, node.getNodeID(),
applications.get(attemptId.getApplicationId()).getUser(), rmContext,
status.getCreationTime());
return rmContainer;
} /**
* Recover resource request back from RMContainer when a container is
* preempted before AM pulled the same. If container is pulled by
* AM, then RMContainer will not have resource request to recover.
* @param rmContainer
*/
/*
* 在AM拉出相同之前当容器被抢占时,从RMContainer恢复资源请求。如果容器被AM拉过来,则RMContainer将不会有资源请求恢复。
*/
protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
// getResourceRequests()函数获取资源请求
List<ResourceRequest> requests = rmContainer.getResourceRequests(); // If container state is moved to ACQUIRED, request will be empty.
// 如果容器状态被移动到 ACQUIRED,请求将为空。
if (requests == null) {
return;
}
// Add resource request back to Scheduler.
// 将资源请求添加回调度器。
SchedulerApplicationAttempt schedulerAttempt
= getCurrentAttemptForContainer(rmContainer.getContainerId());
if (schedulerAttempt != null) {
// 恢复资源请求
schedulerAttempt.recoverResourceRequests(requests);
}
} protected void createReleaseCache() {
// Cleanup the cache after nm expire interval.
// 在nm到期之际后清除缓存。
// Timer类创建一个新的计时器。schedule()函数表示在指定的延迟之后安排指定的任务执行。
new Timer().schedule(new TimerTask() {
@Override
public void run() {
// Map类的values()函数表示 返回此map中包含的值的{@link Collection}视图。
for (SchedulerApplication<T> app : applications.values()) { // 获取当前应用程序的尝试
T attempt = app.getCurrentAppAttempt();
synchronized (attempt) {
//
for (ContainerId containerId : attempt.getPendingRelease()) {
// logFailure()函数表示 为失败的事件创建可读和可分析的审核日志字符串。
RMAuditLogger.logFailure(
app.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container",
"Scheduler",
"Trying to release container not owned by app or with invalid id.",
attempt.getApplicationId(), containerId);
}
// Set类的clear()函数表示 从此set中删除所有元素(可选操作)。 此调用返回后,该组将为空。
attempt.getPendingRelease().clear();
}
}
LOG.info("Release request cache is cleaned up");
}
}, nmExpireInterval);
} // clean up a completed container
// 清理完成的容器
protected abstract void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event); // 清除容器
protected void releaseContainers(List<ContainerId> containers,
SchedulerApplicationAttempt attempt) {
for (ContainerId containerId : containers) {
// 获取给定containerId的容器。
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
//
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
< nmExpireInterval) {
LOG.info(containerId + " doesn't exist. Add the container"
+ " to the release request cache as it maybe on recovery.");
synchronized (attempt) {
// Set类的add()函数表示 如果指定的元素不存在,则将其指定的元素添加到这个set(可选操作)。
// 更正式地,如果set不包含元素<tt> e2 </tt>,则将指定的元素<tt> e </tt>添加到此set,以便
//<tt>(e==null ? e2==null : e.equals(e2))</tt>.
attempt.getPendingRelease().add(containerId);
}
} else {
// logFailure()函数表示 为失败的事件创建可读和可分析的审核日志字符串
RMAuditLogger.logFailure(attempt.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container", "Scheduler",
"Trying to release container not owned by app or with invalid id.",
attempt.getApplicationId(), containerId);
}
}
// 清理完成的容器
// createAbnormalContainerStatus()函数表示在特殊情况下创建{@link ContainerStatus}的实用程序。
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(containerId,
SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
}
} // 获取
// SchedulerNode类表示 从调度器的角度表示YARN集群节点。
public SchedulerNode getSchedulerNode(NodeId nodeId) {
// Map类的get()函数表示 返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
return nodes.get(nodeId);
} // 完全排除应用程序的sourceQueue,将其全部移动到destQueue。
@Override
public synchronized void moveAllApps(String sourceQueue, String destQueue)
throws YarnException {
// check if destination queue is a valid leaf queue
// 检查目标队列是否是有效的叶队列
try {
getQueueInfo(destQueue, false, false);
} catch (IOException e) {
LOG.warn(e);
throw new YarnException(e);
}
// check if source queue is a valid
// 检查源队列是否有效
// getAppsInQueue()函数表示 获取给定队列下的应用程序
List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
if (apps == null) {
String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
LOG.warn(errMsg);
throw new YarnException(errMsg);
}
// generate move events for each pending/running app
// 为每个待处理/正在运行的应用生成移动事件
for (ApplicationAttemptId app : apps) {
//
SettableFuture<Object> future = SettableFuture.create();
// RMAppMoveEvent类构造函数内部有 RMAppEventType.MOVE事件。
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
}
} // 终止指定队列中的所有应用程序。
@Override
public synchronized void killAllAppsInQueue(String queueName)
throws YarnException {
// check if queue is a valid
// 检查队列是否有效
// getAppsInQueue()函数表示 获取给定队列下的应用程序
List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
if (apps == null) {
String errMsg = "The specified Queue: " + queueName + " doesn't exist";
LOG.warn(errMsg);
throw new YarnException(errMsg);
}
// generate kill events for each pending/running app
// 为每个待处理/正在运行的应用生成kill事件
for (ApplicationAttemptId app : apps) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
"Application killed due to expiry of reservation queue " +
queueName + "."));
}
} /**
* Process resource update on a node.
*/
// 在节点上处理资源更新。
public synchronized void updateNodeResource(RMNode nm,
ResourceOption resourceOption) {
SchedulerNode node = getSchedulerNode(nm.getNodeID());
Resource newResource = resourceOption.getResource();
// 获取节点上的总资源。
Resource oldResource = node.getTotalResource();
if(!oldResource.equals(newResource)) {
// Log resource change
// 日志记录资源更改
LOG.info("Update resource on node: " + node.getNodeName()
+ " from: " + oldResource + ", to: "
+ newResource); // Map类的remove()函数表示 从该map中删除一个键的映射,如果存在(可选的操作)。
// 更正式地,如果该map包含从<tt> k </tt>到值<tt> v </tt>的映射,使得<code>(key==null ? k==null : key.equals(k))</code>,
// 该映射被删除。(map最多可以包含一个这样的映射。)
nodes.remove(nm.getNodeID());
//
updateMaximumAllocation(node, false); // update resource to node
// 将资源更新到节点
// 在节点上设置总资源。
node.setTotalResource(newResource); // Map类的put()函数表示 将指定的值与该映射中的指定键相关联(可选操作)。如果map先前包含了键的映射,则旧值将被指定的值替换。
nodes.put(nm.getNodeID(), (N)node);
//
updateMaximumAllocation(node, true); // update resource to clusterResource
// 将资源更新到clusterResource
// subtractFrom(clusterResource, oldResource)表示从clusterResource减去oldResource,资源包括内存和虚拟内核
Resources.subtractFrom(clusterResource, oldResource);
// addTo(clusterResource, newResource)表示在clusterResource添加newResource,资源包括内存和虚拟内核
Resources.addTo(clusterResource, newResource);
} else {
// Log resource change
// 日志记录资源改变
LOG.warn("Update resource on node: " + node.getNodeName()
+ " with the same resource: " + newResource);
}
} /** {@inheritDoc} */
// 返回调度时考虑的资源类型的集合
@Override
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
// EnumSet类的of()函数 创建一个最初包含指定元素的枚举集。
return EnumSet.of(SchedulerResourceTypes.MEMORY);
} // 获取由预留系统管理的队列的名称列表
@Override
public Set<String> getPlanQueues() throws YarnException {
// Object类的getClass()函数 返回此{@code Object}的运行时类。
// 返回的{@code Class}对象是被表示类的{@code static synchronized}方法锁定的对象。
// Class类的getSimpleName()函数返回源代码中给出的基础类的简单名称。 如果基础类是匿名的,则返回一个空字符串。
throw new YarnException(getClass().getSimpleName()
+ " does not support reservations");
} // 更新最大可分配
protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
// 获取节点上的总资源
Resource totalResource = node.getTotalResource();
maxAllocWriteLock.lock();
try {
if (add) { // added node //添加节点
// 获取资源的<em>memory</em>
int nodeMemory = totalResource.getMemory();
if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory;
// 设置资源的<em>memory</em>
// Math.min()返回两个数的最小值
maximumAllocation.setMemory(Math.min(
configuredMaximumAllocation.getMemory(), maxNodeMemory));
}
// 获取资源的<em>number of virtual cpu cores</em>
int nodeVCores = totalResource.getVirtualCores();
if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores;
maximumAllocation.setVirtualCores(Math.min(
configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
}
} else { // removed node //删除节点
if (maxNodeMemory == totalResource.getMemory()) {
maxNodeMemory = -;
}
if (maxNodeVCores == totalResource.getVirtualCores()) {
maxNodeVCores = -;
}
// We only have to iterate through the nodes if the current max memory
// or vcores was equal to the removed node's
// 如果当前的最大内存或虚拟内核等于被删除的节点的,我们只需遍历节点
if (maxNodeMemory == - || maxNodeVCores == -) {
// A map entry (key-value pair). entrySet()返回此map中包含的映射的{@link Set}视图。
for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
int nodeMemory =
nodeEntry.getValue().getTotalResource().getMemory();
if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory;
}
int nodeVCores =
nodeEntry.getValue().getTotalResource().getVirtualCores();
if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores;
}
}
if (maxNodeMemory == -) { // no nodes //无节点
maximumAllocation.setMemory(configuredMaximumAllocation.getMemory());
} else {
maximumAllocation.setMemory(
Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
}
if (maxNodeVCores == -) { // no nodes //无节点
maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores());
} else {
maximumAllocation.setVirtualCores(
Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
}
}
}
} finally {
maxAllocWriteLock.unlock();
}
} // 刷新最大可分配
protected void refreshMaximumAllocation(Resource newMaxAlloc) {
maxAllocWriteLock.lock();
try {
configuredMaximumAllocation = Resources.clone(newMaxAlloc);
int maxMemory = newMaxAlloc.getMemory();
if (maxNodeMemory != -) {
maxMemory = Math.min(maxMemory, maxNodeMemory);
}
int maxVcores = newMaxAlloc.getVirtualCores();
if (maxNodeVCores != -) {
maxVcores = Math.min(maxVcores, maxNodeVCores);
}
//
maximumAllocation = Resources.createResource(maxMemory, maxVcores);
} finally {
maxAllocWriteLock.unlock();
}
} // 为应用尝试获取待处理的资源请求
public List<ResourceRequest> getPendingResourceRequestsForAttempt(
ApplicationAttemptId attemptId) {
// 获取应用程序尝试
SchedulerApplicationAttempt attempt = getApplicationAttempt(attemptId);
if (attempt != null) {
// getAppSchedulingInfo()获取应用程序调度信息。 getAllResourceRequests()获取所有的资源请求。
return attempt.getAppSchedulingInfo().getAllResourceRequests();
}
return null;
}
}
Hadoop 三大调度器包括, Fifo , Capacity 以及 Fair 调度器,如下所示:
(1) Fifo调度器
对应的类是 FifoScheduler.java , 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java 。
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
public class FifoScheduler extends
AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
Configurable { private static final Log LOG = LogFactory.getLog(FifoScheduler.class); private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); Configuration conf; private boolean usePortForNodeName; private ActiveUsersManager activeUsersManager; private static final String DEFAULT_QUEUE_NAME = "default";
private QueueMetrics metrics; private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); //创建一个默认队列
private final Queue DEFAULT_QUEUE = new Queue() {
@Override
public String getQueueName() {
return DEFAULT_QUEUE_NAME;
} @Override
public QueueMetrics getMetrics() {
return metrics;
} @Override
public QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName());
queueInfo.setCapacity(1.0f);
if (clusterResource.getMemory() == 0) {
queueInfo.setCurrentCapacity(0.0f);
} else {
queueInfo.setCurrentCapacity((float) usedResource.getMemory()
/ clusterResource.getMemory());
}
queueInfo.setMaximumCapacity(1.0f);
queueInfo.setChildQueues(new ArrayList<QueueInfo>());
queueInfo.setQueueState(QueueState.RUNNING);
return queueInfo;
} public Map<QueueACL, AccessControlList> getQueueAcls() {
Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>();
for (QueueACL acl : QueueACL.values()) {
acls.put(acl, new AccessControlList("*"));
}
return acls;
} @Override
public List<QueueUserACLInfo> getQueueUserAclInfo(
UserGroupInformation unused) {
QueueUserACLInfo queueUserAclInfo =
recordFactory.newRecordInstance(QueueUserACLInfo.class);
queueUserAclInfo.setQueueName(DEFAULT_QUEUE_NAME);
queueUserAclInfo.setUserAcls(Arrays.asList(QueueACL.values()));
return Collections.singletonList(queueUserAclInfo);
} @Override
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
return getQueueAcls().get(acl).isUserAllowed(user);
} @Override
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
} @Override
public void recoverContainer(Resource clusterResource,
SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return;
}
increaseUsedResources(rmContainer);
updateAppHeadRoom(schedulerAttempt);
updateAvailableResourcesMetrics();
} @Override
public Set<String> getAccessibleNodeLabels() {
// TODO add implementation for FIFO scheduler
return null;
} @Override
public String getDefaultNodeLabelExpression() {
// TODO add implementation for FIFO scheduler
return null;
}
}; public FifoScheduler() {
super(FifoScheduler.class.getName());
} // 初始化调度器
private synchronized void initScheduler(Configuration conf) {
// 验证配置信息
validateConf(conf);
//Use ConcurrentSkipListMap because applications need to be ordered
// 使用ConcurrentSkipListMap,因为应用程序需要有序
// 该applications在它的父类抽象类AbstractYarnScheduler中声明。
this.applications =
new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
// createResource()函数表示创建资源。 getInt()函数表示获取<code> name </code>属性的值作为<code> int </code>。
// 其中DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB = 1024
this.minimumAllocation =
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
// 初始化最大资源容量。
// 其中 DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB = 8192
// 其中 DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES = 4
initMaximumResourceCapability(
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)));
// getBoolean()函数表示获取<code> name </code>属性的值作为<code>boolean</code>。
// 其中 DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = false
this.usePortForNodeName = conf.getBoolean(
YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
//
this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
conf);
// ActiveUsersManager 跟踪系统中的活动用户。
this.activeUsersManager = new ActiveUsersManager(metrics);
} @Override
public void serviceInit(Configuration conf) throws Exception {
//初始化调度器
initScheduler(conf);
super.serviceInit(conf);
} @Override
public void serviceStart() throws Exception {
super.serviceStart();
} @Override
public void serviceStop() throws Exception {
super.serviceStop();
} @Override
public synchronized void setConf(Configuration conf) {
this.conf = conf;
} //验证配置信息
private void validateConf(Configuration conf) {
// validate scheduler memory allocation setting
// 验证调度器内存分配设置
// 其中 DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB = 1024
int minMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
// 其中 DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB = 8192
int maxMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); if (minMem <= 0 || minMem > maxMem) {
throw new YarnRuntimeException("Invalid resource scheduler memory"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "=" + minMem
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
+ "=" + maxMem + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
} @Override
public synchronized Configuration getConf() {
return conf;
} @Override
public int getNumClusterNodes() {
// Map类的size()函数表示 返回此map中键值映射的数量。
return nodes.size();
} @Override
public synchronized void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
} @Override
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException
{
setConf(conf);
} //
@Override
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
return EMPTY_ALLOCATION;
} // Sanity check
// 完整性检查
SchedulerUtils.normalizeRequests(ask, resourceCalculator,
clusterResource, minimumAllocation, getMaximumResourceCapability()); // Release containers
// 释放容器
releaseContainers(release, application); synchronized (application) { // make sure we aren't stopping/removing the application
// when the allocate comes in
// 确保在分配进来时我们不会停止/删除应用程序
if (application.isStopped()) {
LOG.info("Calling allocate on a stopped " +
"application " + applicationAttemptId);
return EMPTY_ALLOCATION;
} if (!ask.isEmpty()) {
LOG.debug("allocate: pre-update" +
" applicationId=" + applicationAttemptId +
" application=" + application);
//
application.showRequests(); // Update application requests
// 更新应用程序请求
application.updateResourceRequests(ask); LOG.debug("allocate: post-update" +
" applicationId=" + applicationAttemptId +
" application=" + application);
application.showRequests(); LOG.debug("allocate:" +
" applicationId=" + applicationAttemptId +
" #ask=" + ask.size());
} // 更新黑名单列表
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
// 创建容器令牌和NMToken,如果其中任何一个由于某些原因(如DNS不可用)而失败,
// 请不要返回此容器并将其保留在等待重新引导的newlyAllocatedContainers中。
ContainersAndNMTokensAllocation allocation =
application.pullNewlyAllocatedContainersAndNMTokens();
// 在应用程序的用户的资源方面获得可用的余量。
Resource headroom = application.getHeadroom();
//
application.setApplicationHeadroomForMetrics(headroom);
// Allocation类
return new Allocation(allocation.getContainerList(), headroom, null,
null, null, allocation.getNMTokenList());
}
} private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
} @VisibleForTesting
public synchronized void addApplication(ApplicationId applicationId,
String queue, String user, boolean isAppRecovering) {
//
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
// Map类函数put()表示将指定的值与该map中的指定键相关联(可选操作)。
applications.put(applicationId, application);
//
metrics.submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", currently num of applications: " + applications.size());
if (isAppRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
}
} else {
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
} @VisibleForTesting
public synchronized void
addApplicationAttempt(ApplicationAttemptId appAttemptId,
boolean transferStateFromPreviousAttempt,
boolean isAttemptRecovering) {
// Map类函数get()表示返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(appAttemptId.getApplicationId());
String user = application.getUser();
// TODO: Fix store
// 创建FiCaSchedulerApp类对象, 表示从FIFO或容量调度器的角度出发的应用程序尝试。
FiCaSchedulerApp schedulerApp =
new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
activeUsersManager, this.rmContext); if (transferStateFromPreviousAttempt) {
schedulerApp.transferStateFromPreviousAttempt(application
.getCurrentAppAttempt());
}
application.setCurrentAppAttempt(schedulerApp); metrics.submitAppAttempt(user);
LOG.info("Added Application Attempt " + appAttemptId
+ " to scheduler from user " + application.getUser());
if (isAttemptRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(appAttemptId
+ " is recovering. Skipping notifying ATTEMPT_ADDED");
}
} else {
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
}
} private synchronized void doneApplication(ApplicationId applicationId,
RMAppState finalState) {
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationId);
if (application == null){
LOG.warn("Couldn't find application " + applicationId);
return;
} // Inform the activeUsersManager
// 通知activeUsersManager
activeUsersManager.deactivateApplication(application.getUser(),
applicationId);
application.stop(finalState);
// Map类函数remove()表示 如果存在(从可选的操作),从该map中删除一个键的映射。
applications.remove(applicationId);
} private synchronized void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
throws IOException {
FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId());
if (application == null || attempt == null) {
throw new IOException("Unknown application " + applicationAttemptId +
" has completed!");
} // Kill all 'live' containers
// 杀死所有的“活”容器
for (RMContainer container : attempt.getLiveContainers()) {
if (keepContainers
&& container.getState().equals(RMContainerState.RUNNING)) {
// do not kill the running container in the case of work-preserving AM
// restart.
// 在维护AM重新启动的情况下,不要杀死正在运行的容器。
LOG.info("Skip killing " + container.getContainerId());
continue;
}
// createAbnormalContainerStatus()表示在特殊情况下创建{@link ContainerStatus}的实用程序。
completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
} // Clean up pending requests, metrics etc.
// 清理待处理的请求,指标等
attempt.stop(rmAppAttemptFinalState);
} /**
* Heart of the scheduler...
*
* @param node node on which resources are available to be allocated
*/
// 调度器的核心...
// 分配容器, 参数node表示资源可用于分配的节点
private void assignContainers(FiCaSchedulerNode node) {
LOG.debug("assignContainers:" +
" node=" + node.getRMNode().getNodeAddress() +
" #applications=" + applications.size()); // Try to assign containers to applications in fifo order
// 尝试以fifo顺序将容器分配给应用程序
for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications
.entrySet()) {
FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt();
if (application == null) {
continue;
} LOG.debug("pre-assignContainers");
application.showRequests();
synchronized (application) {
// Check if this resource is on the blacklist
// 检查这个资源是否在黑名单上
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
continue;
} for (Priority priority : application.getPriorities()) {
// 获取最大可分配容器
int maxContainers =
getMaxAllocatableContainers(application, priority, node,
NodeType.OFF_SWITCH);
// Ensure the application needs containers of this priority
// 确保应用程序需要这个优先级的容器
if (maxContainers > 0) {
// 在节点上分配容器
int assignedContainers =
assignContainersOnNode(node, application, priority);
// Do not assign out of order w.r.t priorities
// 分配不要违反w.r.t优先级
if (assignedContainers == 0) {
break;
}
}
}
} LOG.debug("post-assignContainers");
application.showRequests(); // Done
// 干
if (Resources.lessThan(resourceCalculator, clusterResource,
node.getAvailableResource(), minimumAllocation)) {
break;
}
} // Update the applications' headroom to correctly take into
// account the containers assigned in this update.
// 更新应用程序的余量,以正确地考虑在此更新中分配的容器。
for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) {
//
FiCaSchedulerApp attempt =
(FiCaSchedulerApp) application.getCurrentAppAttempt();
if (attempt == null) {
continue;
}
//
updateAppHeadRoom(attempt);
}
} private int getMaxAllocatableContainers(FiCaSchedulerApp application,
Priority priority, FiCaSchedulerNode node, NodeType type) {
int maxContainers = 0; ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (offSwitchRequest != null) {
maxContainers = offSwitchRequest.getNumContainers();
} // 资源分类。 NODE_LOCAL(0) 表示同一节点, RACK_LOCAL(1) 同一机架上不同节点, OFF_SWITCH(2)不同机架
if (type == NodeType.OFF_SWITCH) {
return maxContainers;
} if (type == NodeType.RACK_LOCAL) {
// getResourceRequest()获取资源请求。 getRMNode()返回RMNode类对象, getRackName()函数此节点管理器的机架名称。
ResourceRequest rackLocalRequest =
application.getResourceRequest(priority, node.getRMNode().getRackName());
if (rackLocalRequest == null) {
return maxContainers;
} maxContainers = Math.min(maxContainers, rackLocalRequest.getNumContainers());
} if (type == NodeType.NODE_LOCAL) {
// getResourceRequest()获取资源请求。 getNodeAddress()该节点的ContainerManager地址。
ResourceRequest nodeLocalRequest =
application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
if (nodeLocalRequest != null) {
// getNumContainers()获取所需规格的容器数量
maxContainers = Math.min(maxContainers, nodeLocalRequest.getNumContainers());
}
} return maxContainers;
} // 在节点上分配容器
private int assignContainersOnNode(FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority
) {
// Data-local // 数据本地
int nodeLocalContainers =
assignNodeLocalContainers(node, application, priority); // Rack-local // 机架本地
int rackLocalContainers =
assignRackLocalContainers(node, application, priority); // Off-switch // 非同一机架
int offSwitchContainers =
assignOffSwitchContainers(node, application, priority); LOG.debug("assignContainersOnNode:" +
" node=" + node.getRMNode().getNodeAddress() +
" application=" + application.getApplicationId().getId() +
" priority=" + priority.getPriority() +
" #assigned=" +
(nodeLocalContainers + rackLocalContainers + offSwitchContainers)); return (nodeLocalContainers + rackLocalContainers + offSwitchContainers);
} // 分配节点本地容器
private int assignNodeLocalContainers(FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority) {
int assignedContainers = 0;
// getNodeName()获取节点的名称以调度匹配决策。
ResourceRequest request =
application.getResourceRequest(priority, node.getNodeName());
if (request != null) {
// Don't allocate on this node if we don't need containers on this rack
// 如果我们不需要在此机架上的容器,则不要在此节点上分配
// getRackName()此节点管理器的机架名称。
ResourceRequest rackRequest =
application.getResourceRequest(priority,
node.getRMNode().getRackName());
// getNumContainers()获取所需规格的容器数量。
if (rackRequest == null || rackRequest.getNumContainers() <= 0) {
return 0;
} int assignableContainers =
Math.min(
getMaxAllocatableContainers(application, priority, node,
NodeType.NODE_LOCAL),
request.getNumContainers());
// 分配容器
assignedContainers =
assignContainer(node, application, priority,
assignableContainers, request, NodeType.NODE_LOCAL);
}
return assignedContainers;
} // 分配机架本地容器
private int assignRackLocalContainers(FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority) {
int assignedContainers = 0;
ResourceRequest request =
application.getResourceRequest(priority, node.getRMNode().getRackName());
if (request != null) {
// Don't allocate on this rack if the application doens't need containers
// 如果应用程序不需要容器,请不要在此机架上分配
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (offSwitchRequest.getNumContainers() <= 0) {
return 0;
} int assignableContainers =
Math.min(
getMaxAllocatableContainers(application, priority, node,
NodeType.RACK_LOCAL),
request.getNumContainers());
// 分配容器
assignedContainers =
assignContainer(node, application, priority,
assignableContainers, request, NodeType.RACK_LOCAL);
}
return assignedContainers;
} // 分配容器跨机架
private int assignOffSwitchContainers(FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority) {
int assignedContainers = 0;
ResourceRequest request =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (request != null) {
assignedContainers =
assignContainer(node, application, priority,
request.getNumContainers(), request, NodeType.OFF_SWITCH);
}
return assignedContainers;
} // 分配容器
private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, int assignableContainers,
ResourceRequest request, NodeType type) {
LOG.debug("assignContainers:" +
" node=" + node.getRMNode().getNodeAddress() +
" application=" + application.getApplicationId().getId() +
" priority=" + priority.getPriority() +
" assignableContainers=" + assignableContainers +
" request=" + request + " type=" + type);
// 获取请求的<code>Resource</code>容量。
Resource capability = request.getCapability(); int availableContainers =
node.getAvailableResource().getMemory() / capability.getMemory(); // TODO: A buggy
// application
// with this
// zero would
// crash the
// scheduler.
int assignedContainers =
Math.min(assignableContainers, availableContainers); if (assignedContainers > 0) {
for (int i=0; i < assignedContainers; ++i) { // getNodeID()该节点的节点ID。
NodeId nodeId = node.getRMNode().getNodeID();
ContainerId containerId = BuilderUtils.newContainerId(application
.getApplicationAttemptId(), application.getNewContainerId()); // Create the container
// 创建容器
Container container =
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
.getHttpAddress(), capability, priority, null); // Allocate!
// 分配! // Inform the application
// 通知应用程序
RMContainer rmContainer =
application.allocate(type, node, priority, request, container); // Inform the node
// 通知节点
node.allocateContainer(rmContainer); // Update usage for this container
// 更新此容器的使用
increaseUsedResources(rmContainer);
} } return assignedContainers;
} private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID()); // 获取并清除在NM心跳中累积的containerUpdates列表。
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
//
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
//
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
// 处理新发起的容器
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
} // Process completed containers
// 处理完成的容器
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
} if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
return;
} if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
node.getAvailableResource(),minimumAllocation)) {
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getAvailableResource()); //
assignContainers(node); LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getAvailableResource());
} updateAvailableResourcesMetrics();
} // 增加使用的资源,
private void increaseUsedResources(RMContainer rmContainer) {
// addTo()把后面的资源添加到前面
Resources.addTo(usedResource, rmContainer.getAllocatedResource());
} private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
//
schedulerAttempt.setHeadroom(Resources.subtract(clusterResource,
usedResource));
} private void updateAvailableResourcesMetrics() {
// 设置可用资源。 资源变得可用时由调度器定期调用。
metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource,
usedResource));
} @Override
public void handle(SchedulerEvent event) {
switch(event.getType()) {
case NODE_ADDED:
{
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
nodeAddedEvent.getAddedRMNode()); }
break;
case NODE_REMOVED:
{
NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
removeNode(nodeRemovedEvent.getRemovedRMNode());
}
break;
case NODE_RESOURCE_UPDATE:
{
NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
(NodeResourceUpdateSchedulerEvent)event;
updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
nodeResourceUpdatedEvent.getResourceOption());
}
break;
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent =
(NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode());
}
break;
case APP_ADDED:
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
appAddedEvent.getQueue(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:
{
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
doneApplication(appRemovedEvent.getApplicationID(),
appRemovedEvent.getFinalState());
}
break;
case APP_ATTEMPT_ADDED:
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
appAttemptAddedEvent.getIsAttemptRecovering());
}
break;
case APP_ATTEMPT_REMOVED:
{
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
(AppAttemptRemovedSchedulerEvent) event;
try {
doneApplicationAttempt(
appAttemptRemovedEvent.getApplicationAttemptID(),
appAttemptRemovedEvent.getFinalAttemptState(),
appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
} catch(IOException ie) {
LOG.error("Unable to remove application "
+ appAttemptRemovedEvent.getApplicationAttemptID(), ie);
}
}
break;
case CONTAINER_EXPIRED:
{
ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event;
ContainerId containerid = containerExpiredEvent.getContainerId();
completedContainer(getRMContainer(containerid),
SchedulerUtils.createAbnormalContainerStatus(
containerid,
SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE);
}
break;
case CONTAINER_RESCHEDULED:
{
ContainerRescheduledEvent containerRescheduledEvent =
(ContainerRescheduledEvent) event;
RMContainer container = containerRescheduledEvent.getContainer();
recoverResourceRequestForContainer(container);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
} // 清理完成的容器
@Lock(FifoScheduler.class)
@Override
protected synchronized void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
return;
} // Get the application for the finished container
// 获取完成了的容器的应用程序
Container container = rmContainer.getContainer();
// 根据容器Id获取当前应用程序的尝试
FiCaSchedulerApp application =
getCurrentAttemptForContainer(container.getId());
ApplicationId appId =
container.getId().getApplicationAttemptId().getApplicationId(); // Get the node on which the container was allocated
// 获取分配容器的节点
FiCaSchedulerNode node = getNode(container.getNodeId()); if (application == null) {
LOG.info("Unknown application: " + appId +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
return;
} // Inform the application
// 通知应用程序
application.containerCompleted(rmContainer, containerStatus, event); // Inform the node
// 通知节点 在此节点上释放分配的容器。
node.releaseContainer(container); // Update total usage
// 更新总的使用情况
Resources.subtractFrom(usedResource, container.getResource()); LOG.info("Application attempt " + application.getApplicationAttemptId() +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event); } private Resource usedResource = recordFactory.newRecordInstance(Resource.class); // 移除节点
private synchronized void removeNode(RMNode nodeInfo) {
FiCaSchedulerNode node = getNode(nodeInfo.getNodeID());
if (node == null) {
return;
}
// Kill running containers
// 杀死正在运行的容器
for(RMContainer container : node.getRunningContainers()) {
completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.LOST_CONTAINER),
RMContainerEventType.KILL);
} //Remove the node
// 移除节点
this.nodes.remove(nodeInfo.getNodeID());
updateMaximumAllocation(node, false); // Update cluster metrics
//
Resources.subtractFrom(clusterResource, node.getTotalResource());
} @Override
public QueueInfo getQueueInfo(String queueName,
boolean includeChildQueues, boolean recursive) {
return DEFAULT_QUEUE.getQueueInfo(false, false);
} @Override
public List<QueueUserACLInfo> getQueueUserAclInfo() {
return DEFAULT_QUEUE.getQueueUserAclInfo(null);
} @Override
public ResourceCalculator getResourceCalculator() {
return resourceCalculator;
} // 添加节点
private synchronized void addNode(RMNode nodeManager) {
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
usePortForNodeName);
this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, schedulerNode.getTotalResource());
updateMaximumAllocation(schedulerNode, true);
} @Override
public void recover(RMState state) {
// NOT IMPLEMENTED
} @Override
public RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
return (attempt == null) ? null : attempt.getRMContainer(containerId);
} @Override
public QueueMetrics getRootQueueMetrics() {
return DEFAULT_QUEUE.getMetrics();
} // 检查用户是否有权执行操作。
@Override
public synchronized boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName) {
return DEFAULT_QUEUE.hasAccess(acl, callerUGI);
} @Override
public synchronized List<ApplicationAttemptId>
getAppsInQueue(String queueName) {
if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
List<ApplicationAttemptId> attempts =
new ArrayList<ApplicationAttemptId>(applications.size());
for (SchedulerApplication<FiCaSchedulerApp> app : applications.values()) {
attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
}
return attempts;
} else {
return null;
}
} public Resource getUsedResource() {
return usedResource;
}
}
Fifo队列内部的调用过程是handle()函数内部NODE_UPDATE事件触发nodeUpdate(...)函数; 该函数内部调用assignContainers(...); 该函数内部调用assignContainersOnNode(...); 该函数顺序执行assignNodeLocalContainers(...), assignRackLocalContainers(...) 以及 assignOffSwitchContainers(...); 其中这三个函数内部都会顺序执行 getResourceRequest(...), getMaxAllocatableContainers(...), assignContainer(...)
(2) Capacity调度器
对应的类是 CapacityScheduler.java ,在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java 。
(3) Fair调度器
对应的类是 FairScheduler.java, 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java 。
这三个调度器类都
2 JobTracker中的资源管理和作业控制功能分开,分别由组件ResourceManager和ApplicationMaster实现。
(1) ResourceManager 负责所有应用程序的资源分配
对应的类是 ResourceManager.java 在
(2) ApplicationMaster 仅负责管理一个应用程序
对应的类是 ApplicationMaster.java 在
另外, NodeManager
3 编写Hadoop调度器 参考如何编写Hadoop调度器 以及 深入Hadoop的调度器
假设我们要编写一个新的调度器,为MyHadoopScheduler,需要进行以下工作:
(1) 用户需要自己实现的类
(2) 用户要用到的系统类