Spark分析之Master

时间:2023-05-31 12:40:08
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
webUi.bind() //绑定WEBUI
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) //定时检查超时未发送心跳的Worker masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()

//用作master recover
persistenceEngine = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
case "FILESYSTEM" =>
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
case _ =>
new BlackHolePersistenceEngine()
}

//用作master的leader选举
leaderElectionAgent = RECOVERY_MODE match {
case "ZOOKEEPER" =>
context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
case _ =>
context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
}
} override def receive = {
  case RegisterWorker case RequestSubmitDriver case RegisterApplication case Heartbeat case ExecutorStateChanged ......
}

Master的主要功能总结:

1、Master的Leader选举;

2、对Driver、Worker、Application的管理

  1)Driver:RequestSubmitDriver/RequestKillDriver/RequestDriverStatus/DriverStateChanged

  2)Worker:RegisterWorker/Heartbeat/WorkerSchedulerStateResponse/CheckForWorkerTimeOut/ExecutorStateChanged

  3)Application:RegisterApplication