1. 启动脚本
sbin/start-master.sh
"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
(1)SPARK_MASTER_IP
(2)SPARK_MASTER_PORT
(3)SPARK_MASTER_WEBUI_PORT
Master类最终会通过bin/spark-class脚本启动。
其中的参数“1”用于表示master编号,在生成日志文件时起作用,并不会传入Master类。
spark-xxx-org.apache.spark.deploy.master.Master-1-CentOS-01.out
spark-xxx-org.apache.spark.deploy.master.Master-1.pid
2. Master.main
-
1 def main(argStrings: Array[String]) { 2 SignalLogger.register(log) 3 val conf = new SparkConf 4 val args = new MasterArguments(argStrings, conf) 5 val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf) 6 actorSystem.awaitTermination() 7 }
(1)创建MasterArguments对象并初始化其成员;
(2)调用startSystemAndActor方法,创建ActorSystem对象并启动Master actor;
2.1. MasterArguments
-
parse(args.toList) // This mutates the SparkConf, so all accesses to it must be made after this line propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
(1)parse方法负责解析启动脚本所带的命令行参数;
(2)loadDefaultSparkProperties负责从配置文件中加载spark运行属性,默认而配置文件为spark-defaults.conf;
2.2. startSystemAndActor
-
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) val actor = actorSystem.actorOf( Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
(1)通过AkkaUtils.createActorSystem创建ActorSystem对象
(2)创建Master actor并启动
3. Master Actor
3.1. 重要数据成员
-
val workers = new HashSet[WorkerInfo] val idToWorker = new HashMap[String, WorkerInfo] val addressToWorker = new HashMap[Address, WorkerInfo] val apps = new HashSet[ApplicationInfo] val idToApp = new HashMap[String, ApplicationInfo] val actorToApp = new HashMap[ActorRef, ApplicationInfo] val addressToApp = new HashMap[Address, ApplicationInfo] val waitingApps = new ArrayBuffer[ApplicationInfo] val completedApps = new ArrayBuffer[ApplicationInfo] var nextAppNumber = 0 val appIdToUI = new HashMap[String, SparkUI] val drivers = new HashSet[DriverInfo] val completedDrivers = new ArrayBuffer[DriverInfo] val waitingDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling
3.2. Master.preStart
-
// Listen for remote client disconnection events, since they don\'t go through Akka\'s watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
监听RemotingLifecycleEvent事件,它一个trait:
-
sealed trait RemotingLifecycleEvent extends Serializable { def logLevel: Logging.LogLevel }
Master只处理了DisassociatedEvent消息。
-
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
启动定时器,检查Worker超时;以Work超时时间为周期,向Master发送CheckForWorkerTimeOut消息;默认超时时间为60秒,可通过spark.worker.timeout属性设置。
-
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") val zkFactory = new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system)) (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) case "FILESYSTEM" => val fsFactory = new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system)) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) case "CUSTOM" => val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory")) val factory = clazz.getConstructor(conf.getClass, Serialization.getClass) .newInstance(conf, SerializationExtension(context.system)) .asInstanceOf[StandaloneRecoveryModeFactory] (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) case _ => (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) }
根据RECOVERY_MODE创建持久化引擎和领导选择代理。RECOVERY_MODE默认值为NONE,通过spark.deploy.recoveryMode进行配置。
假设RECOVERY_MODE值为NONE。
(1)创建BlackHolePersistenceEngine对象,不做任何持久化操作;
(2)创建MonarchyLeaderAgent对象,其主构造函数将向Master发送ElectedLeader消息
3.3. Master消息处理
3.3.1. ElectedLeader消息
-
case ElectedLeader => { val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData() state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) { RecoveryState.ALIVE } else { RecoveryState.RECOVERING } logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedDrivers, storedWorkers) recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, CompleteRecovery) } }
前面假设RECOVERY_MODE值为NONE,所以不执行任何recovery操作,直接将state设置为RecoveryState.ALIVE。
3.3.2. CheckForWorkerTimeOut消息
-
case CheckForWorkerTimeOut => { timeOutDeadWorkers() }
检查超时Worker节点。Worker节点超时时间默认为60秒,通过spark.worker.timeout属性设置。
3.3.3. DisassociatedEvent消息
-
case DisassociatedEvent(_, address, _) => { // The disconnected client could\'ve been either a worker or an app; remove whichever it was logInfo(s"$address got disassociated, removing it.") addressToWorker.get(address).foreach(removeWorker) addressToApp.get(address).foreach(finishApplication) if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } }
3.3.4. RegisterWorker消息
这是Worker和Master之间的注册消息。
-
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerUiPort, publicAddress) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() }
(1)创建WorkerInfo对象;
(2)调用registerWorker方法,记录Worker信息;
(3)向Worker发送RegisteredWorker消息;
(4)调用schedule方法,该方法的职责是为Driver和App分配资源。
3.3.4.1. WorkerInfo
-
private[spark] class WorkerInfo( val id: String, val host: String, val port: Int, val cores: Int, val memory: Int, val actor: ActorRef, val webUiPort: Int, val publicAddress: String) extends Serializable { ... init() ... private def init() { executors = new mutable.HashMap drivers = new mutable.HashMap state = WorkerState.ALIVE coresUsed = 0 memoryUsed = 0 lastHeartbeat = System.currentTimeMillis() }
创建WorkerInfo对象,并调用init进行初始化。
3.3.4.2. Master.registerWorker
-
workers.filter { w => (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD) }.foreach { w => workers -= w }
移除状态位DEAD的WorkerInfo
-
val workerAddress = worker.actor.path.address if (addressToWorker.contains(workerAddress)) { val oldWorker = addressToWorker(workerAddress) if (oldWorker.state == WorkerState.UNKNOWN) { // A worker registering from UNKNOWN implies that the worker was restarted during recovery. // The old worker must thus be dead, so we will remove it and accept the new worker. removeWorker(oldWorker) } else { logInfo("Attempted to re-register worker at same address: " + workerAddress) return false } } workers += worker idToWorker(worker.id) = worker addressToWorker(workerAddress) = worker
记录WorkInfo信息至workers、idToWorker、addressToWorker。
4. 启动结束
到此,启动过程就完成了。
接下来开始等待worker及driver消息请求。