在Master启动过程中,首先调用了 netty on Start方法。
override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort //这里会启一个定时调度,检查timeout的worker进程。如果有worker超时,则将状态置为DEAD,并清理一些内存中关于该worker的信息。如果该worker中有Executor进程,则向driver发送ExecutorUpdated消息,表明该Executor也已经不可用了。如果该worker中有Driver进程,且配置driver是可以relaunch的,则重新调度在可用的worker节点上启动,不然的话就删除该Driver的内存信息。只有在该worker超时很多次之后,才真正删除,之前其实只是让该worker不被选中执行任务而已。
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}
restServerBoundPort = restServer.map(_.start())
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
// Attach the master and app metrics servlet handler to the web ui after the metrics systems are
// started.
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
val serializer = new JavaSerializer(conf) //master ha 过程
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}
上面的persistenceEngine_封装了在zk中读写元数据信息,以及序列化反序列化的接口
leaderElectionAgent_封装了master的选举过程,见下面代码注释中的解释
[java] view plain copy
- private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
- conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
-
-
- val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
-
- private var zk: CuratorFramework = _
- private var leaderLatch: LeaderLatch = _
- private var status = LeadershipStatus.NOT_LEADER
-
-
- start()
-
- private def start() {
- logInfo("Starting ZooKeeper LeaderElection agent")
- zk = SparkCuratorUtil.newClient(conf)
- leaderLatch = new LeaderLatch(zk, WORKING_DIR)
- leaderLatch.addListener(this)
- leaderLatch.start()
- }
-
- override def stop() {
- leaderLatch.close()
- zk.close()
- }
-
-
- override def isLeader() {
- synchronized {
-
- if (!leaderLatch.hasLeadership) {
- return
- }
-
- logInfo("We have gained leadership")
- updateLeadershipStatus(true)
- }
- }
-
-
- override def notLeader() {
- synchronized {
-
- if (leaderLatch.hasLeadership) {
- return
- }
-
- logInfo("We have lost leadership")
- updateLeadershipStatus(false)
- }
- }
- private def updateLeadershipStatus(isLeader: Boolean) {
-
- if (isLeader && status == LeadershipStatus.NOT_LEADER) {
- status = LeadershipStatus.LEADER
- masterActor.electedLeader()
-
- } else if (!isLeader && status == LeadershipStatus.LEADER) {
- status = LeadershipStatus.NOT_LEADER
- masterActor.revokedLeadership()
- }
- }
-
- private object LeadershipStatus extends Enumeration {
- type LeadershipStatus = Value
- val LEADER, NOT_LEADER = Value
- }
- }
继续查看master中的逻辑
[java] view plain copy
- override def receiveWithLogging: PartialFunction[Any, Unit] = {
- case ElectedLeader => {
-
- val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
- state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
- RecoveryState.ALIVE
- } else {
- RecoveryState.RECOVERING
- }
- logInfo("I have been elected leader! New state: " + state)
- if (state == RecoveryState.RECOVERING) {
- beginRecovery(storedApps, storedDrivers, storedWorkers)
- recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
- CompleteRecovery)
- }
- }
-
- case CompleteRecovery => completeRecovery()
-
-
- case RevokedLeadership => {
- logError("Leadership has been revoked -- master shutting down.")
- System.exit(0)
- }
开始恢复
[java] view plain copy
- private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
- storedWorkers: Seq[WorkerInfo]) {
- for (app <- storedApps) {
- logInfo("Trying to recover app: " + app.id)
- try {
- registerApplication(app)
- app.state = ApplicationState.UNKNOWN
- app.driver.send(MasterChanged(self, masterWebUiUrl))
- } catch {
- case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
- }
- }
-
- for (driver <- storedDrivers) {
-
-
- drivers += driver
- }
-
- for (worker <- storedWorkers) {
- logInfo("Trying to recover worker: " + worker.id)
- try {
- registerWorker(worker)
- worker.state = WorkerState.UNKNOWN
- wworker.endpoint.send(MasterChanged(self, masterWebUiUrl))
- } catch {
- case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
- }
- }
- }
看driver端收到MasterChanged消息会发生什么?在AppClient.scala中
只有主master会发送MasterChanged消息,所以这里的masterUrl肯定是新的主master的
[java] view plain copy
- case MasterChanged(masterUrl, masterWebUiUrl) =>
- logInfo("Master has changed, new master is at " + masterUrl)
-
- changeMaster(masterUrl)
- alreadyDisconnected = false
- sender ! MasterChangeAcknowledged(appId)
master这时会收到所有app中driver发来的消息,我们看master收到MasterChangeAcknowledged消息的处理方式,参数为appId[java] view plain copy
- case MasterChangeAcknowledged(appId) => {
- idToApp.get(appId) match {
- case Some(app) =>
- logInfo("Application has been re-registered: " + appId)
- app.state = ApplicationState.WAITING
- case None =>
- logWarning("Master change ack from unknown app: " + appId)
- }
-
- if (canCompleteRecovery) { completeRecovery() }
- }
看worker端收到MasterChanged消息会发生什么?在Worker.scala中
[java] view plain copy
- case MasterChanged(masterUrl, masterWebUiUrl) =>
- logInfo("Master has changed, new master is at " + masterUrl)
- changeMaster(masterUrl, masterWebUiUrl)
-
-
- val execs = executors.values.
- map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
- sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)
继续看master中的处理逻辑
[java] view plain copy
- case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
- idToWorker.get(workerId) match {
- case Some(worker) =>
- logInfo("Worker has been re-registered: " + workerId)
- worker.state = WorkerState.ALIVE
-
-
- val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
- for (exec <- validExecutors) {
- val app = idToApp.get(exec.appId).get
- val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
- worker.addExecutor(execInfo)
- execInfo.copyState(exec)
- }
-
-
- for (driverId <- driverIds) {
- drivers.find(_.id == driverId).foreach { driver =>
- driver.worker = Some(worker)
- driver.state = DriverState.RUNNING
- worker.drivers(driverId) = driver
- }
- }
- case None =>
- logWarning("Scheduler state from unknown worker: " + workerId)
- }
-
- if (canCompleteRecovery) { completeRecovery() }
- }
这一切都处理完毕之后,看master的completeRecovery,这个是在beginRecovery调用之后,在延迟worker_timeout时间之后调用,一般情况下,上面的消息来回发送处理应该都已经结束了
[java] view plain copy
- private def completeRecovery() {
-
- synchronized {
- if (state != RecoveryState.RECOVERING) { return }
- state = RecoveryState.COMPLETING_RECOVERY
- }
-
-
-
- workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
- apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
-
-
-
- drivers.filter(_.worker.isEmpty).foreach { d =>
- logWarning(s"Driver ${d.id} was not found after master recovery")
- if (d.desc.supervise) {
- logWarning(s"Re-launching ${d.id}")
- relaunchDriver(d)
- } else {
- removeDriver(d.id, DriverState.ERROR, None)
- logWarning(s"Did not re-launch ${d.id} because it was not supervised")
- }
- }
-
- state = RecoveryState.ALIVE
- schedule()
- logInfo("Recovery complete - resuming operations!")
- }