一、涉及到的源码列表:
- Master.scala
- WorkerInfo.scala
- DriverDescription.scala
- RecoveryState.scala
- PersistenceEngine.scala
- MetricsSystem.scala
- ApplicationDescription.scala
- DeployMessage.scala
等等
二、语言叙述(包括截图):
A:主备切换
1.概述: 关键词——(standalong模式,Active Master,两种主备切换模式,具体操作)
spark的standalone模式提供了master的HA,与hadoop一样,一个是active,一个是standby状态。spark HA的主备切换主要基于两种机制:基于文件系统和基于zk集群。前者在挂了后需要手动切换,而基于zk的HA可以自动实现切换。
图 1
2.主备切换的简要流程:关键词——(等待的Master,持久化引擎,如果不为空,重新注册到内存,修改状态,发送master地址,检测过滤,schedule()调度)
图 2
3.
B:注册机制
1. 注册机制:关键词——(注册woker、application,持久化,schedule,在队列中等待调度)
C:状态改变机制
D:资源调度机制(schedule(),目前两种资源调度方法)
三、相应源码查看学习:
- Master.scala
定义一些变量:
// RecoveryState有STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY四个阶段
private var state = RecoveryState.STANDBY
// 持久化引擎
private var persistenceEngine: PersistenceEngine = _
private var leaderElectionAgent: LeaderElectionAgent = _
private var recoveryCompletionTask: ScheduledFuture[_] = _
private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
// 在有更好的配置内存的方法之前,作为暂时的 workaround ,我们允许用户设置一个标志,
// 这个标志 will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
// 如果不特别指定,就对applications默认使用MaxValue值 (i.e. pass Int.MaxValue)
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
onStart函数(Inbox.scala中有调用它)
override def onStart(): Unit = {
// 在使用Spark-shell时,常看见的一些信息
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
主备切换的一些前奏:
override def receive: PartialFunction[Any, Unit] = {
case ElectedLeader =>
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv) // 读取持久化信息,如果storedApps, storedDrivers, storedWorkers有任意一个是非空的,就重新注册
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE // 得先重新注册
} else {
RecoveryState.RECOVERING // 状态为RECOVERING才可进行主备切换
}
logInfo("I have been elected leader! New state: " + state)
case RegisterWorker()用于注册worker
case RegisterWorker(
// 获取Woker的信息(此时还没有注册)
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else if (idToWorker.contains(id)) { // 如果id列表中有该id,注册失败
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
} else { // 注册一个worker,并添加到持久化引擎
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
// 同理,注册成功后需要持久化(注册时还会进行一些清理操作)
persistenceEngine.addWorker(worker)
case RegisterApplication(description, driver)用于注册worker:
// 处理Application注册请求,调用createApplication方法和registerApplication方法
case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) { // 当前master状态为STANDBY不是ALIVE,就忽视请求
// 忽略,什么都不回复
} else {
logInfo("Registering app " + description.name)
// 调用createApplication进行具体的application的创建
val app = createApplication(description, driver)
// 创建完注册,进入FIFO队列等待资源
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
// 持久化application,具体调用的是persistenceEngine中的persist方法,决定对象序列化和持久化的方式
persistenceEngine.addApplication(app)
// 这里会反向向SparkDeployScheduleBackend的AppClient的ClientActor发送消息,就是下面的RegisteredApplication
// (Actor是scala的多线程编程,Spark2.0之后是用的Netty,之前是Akka)
driver.send(RegisteredApplication(app.id, self))
schedule()
}
createApplication方法和registerApplication方法:
private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
// 用当前获得的时间戳进行相应处理,作为appId(String类型)
val appId = newApplicationId(date)
new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
}
// 创建好之后在这里调用,注册application
private def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.address // driver地址必须唯一,不能重复注册
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
// 去调度系统注册资源
applicationMetricsSystem.registerSource(app.appSource)
// 将注册好的application加入缓存中
apps += app // apps是一个HashSet,会去重
idToApp(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
// 加入队列,等待调度(目前的策略是先入先出)
waitingApps += app
if (reverseProxy) {
webUi.addProxyTargets(app.id, app.desc.appUiUrl)
}
}
completeRecovery函数:
private def completeRecovery() {}
state = RecoveryState.COMPLETING_RECOVERY
// 杀掉所有没有做出回应的app和worker(过滤UNKNOWN状态)
// 分别调用的是removeWorker和finishApplication方法
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// 把恢复了的application的状态修改为RUNNING.
apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)
removeWorker函数:
private def removeWorker(worker: WorkerInfo) {}
private def removeWorker(worker: WorkerInfo) {
// WorkerInfo中封装了Worker的host、端口、cores、内存、WebUi地址等信息
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
// 无论是故障还是已经死亡,都将状态改为DEAD
worker.setState(WorkerState.DEAD)
// 在HashMap中除去对应Worker的id和地址
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
if (reverseProxy) {
webUi.removeProxyTargets(worker.id)
}
// 取得对应Worker上所有的executors,发送丢失信号之后删除executors
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
exec.state = ExecutorState.LOST
exec.application.removeExecutor(exec)
}
// Worker上的driver如果配置了supervise属性,就将drivers放进schedule等待重新加载
// 没有配置supervise属性的直接杀掉
for (driver <- worker.drivers.values) {
if (driver.desc.supervise) {
logInfo(s"Re-launching ${driver.id}")
relaunchDriver(driver)
}
DriverStateChanged
case DriverStateChanged(driverId, state, exception) =>
state match { // 属于以下状态的任意一种,该Driver都将被删除
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ => // 其他的类型就抛异常
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
private def removeDriver( // DriverStateChanged中被调用
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
// 用Scala的find高阶函数找到driverId对应的driver
drivers.find(d => d.id == driverId) match {
// 如果找到【使用Some(样例类-Option)】
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
// 从缓存中移除
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
// completedDrivers中加入driver
completedDrivers += driver
// 用持久化引擎去除Driver的持久化信息
persistenceEngine.removeDriver(driver)
driver.state = finalState
driver.exception = exception
// 有该Driver的Worker,都删除掉该Driver
driver.worker.foreach(w => w.removeDriver(driver))
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
}
ExecutorStateChanged
// 向driver同步发送ExecutorUpdated的消息
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
// ExecutorState有LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED这几种状态
// 将KILLED, FAILED, LOST, EXITED四种状态认为是finished
if (ExecutorState.isFinished(state)) {
// 从worker和application中移除这个executor
logInfo(s"Removing executor ${exec.fullId} because it is $state")
if (!appInfo.isFinished) {
appInfo.removeExecutor(exec)
}
// 在运行着该executor的worker中移除该executor
exec.worker.removeExecutor(exec)
val normalExit = exitStatus == Some(0)
// application只尝试确定次数次重试,默认10(spark.deploy.maxExecutorRetries, 10)TODO:在哪里进行的application重试
// 用incrementRetryCount进行失败次数的统计
// 重要注意事项: 这段代码没有经过测试,所以在改变下面的if条件时一定要小心
if (!normalExit
&& appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
&& MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
// 反复尝试都失败,就将application删除
removeApplication(appInfo, ApplicationState.FAILED)
removeExecutor在ApplicationInfo.scala中
def removeExecutor(exec: ExecutorDesc) {
if (executors.contains(exec.fullId)) {
executors -= exec.fullId
coresUsed -= exec.cores
memoryUsed -= exec.memory
}
}
- WorkerInfo.scala
WorkerInfo类的一些属性:
val id: String,
val host: String,
val port: Int,
val cores: Int,
val memory: Int,
val endpoint: RpcEndpointRef,
val webUiAddress: String)
3.RecoveryState.scala
RecoveryState有STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY四个阶段
四、活学活用:
- 暂无
五、疑惑/TODO:
对于图1:仅standalong模式支持主备切换吗?
在修改注释时,总是:
尝试了半天也没确定scalastyle需要符合什么标准(我先暂时// scalastyle:off吧)
六、总结:
阅读Spark源码的一些技巧:
关注长段注释,那往往表明作者想要表达很多东西(或阐述一些建议)这种写法感觉日常可以用到,以及可变Buffer的实践:
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}Scala的find高阶函数的应用
drivers.find(d => d.id == driverId) match {
case Some(driver) =>spark.deploy.defaultCores的值默认为Int.Max,也就是不限制的意思)从而应用程序可以使用所有当前可以获得的CPU资源。————
spark.cores.max 这个参数决定了在Standalone和Mesos模式下,一个Spark应用程序所能申请的CPU Core的数量。如果你没有并发跑多个Spark应用程序的需求,那么可以不需要设置这个参数,默认会使用spark.deploy.defaultCores的值。如果明确设置了这一参数,则当worker拥有足够的cores和内存时,就可以在同一个worker上启动来自同一应用程序的多个executors。 否则,每个executor会使用所有可用的cores,并且在这种情况下,每个worker只能启动一个executor。
如果要配置这个参数,一定要注意executors数量以及spark.executor.cores参数的值。
针对这个参数需要注意的是,这个参数对Yarn模式不起作用,YARN模式下,资源由Yarn统一调度管理,一个应用启动时所申请的CPU资源的数量由另外两个直接配置Executor数量和每个Executor中core数量的参数决定。scheduleExecutorsOnWorkers:默认将应用程序的executors分散到尽可能多的worker上(此模式通常更适合本地数据)
七、参考:
- http://blog.csdn.net/colorant/article/details/38681627(Spark性能相关参数配置)