Master作为集群的Manager,对于集群的健壮运行发挥着十分重要的作用。下面,我们一起了解一下Master是听从Client(Leader)的号召,如何管理好Worker的吧。
1.家当(静态属性)
1.设置一个守护单线程的消息发送器,
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
2.根据sparkConf得到hadoopConf
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
3.一个bool类型的标识,如果设置为true,那么app的执行将会尽量分步到尽可能多的worker上,否则app的执行将会先用完一个worker的资源,然后再使用下一个worker的资源
private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
4.设置执行app默认的最大核数为Int类型的最大值
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
5.还有一些关于worker、driver、app等的字段信息,都比较简单,限于篇幅限制就不一一列出了
2.技能(方法)
由于Master上本质上是一个RpcEndpoint,所以我们按照它的生命周期进行介绍。如果不明白,请看文章
Spark Rpc通信源码分析 http://www.cnblogs.com/yourarebest/p/5297157.html
1.构造函数就是Master默认的主构造器
2.onStart方法,主要功能是启动Jetty的WebUI服务,Rest服务、选出持久化引擎及持久化代理
override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
webUi = new MasterWebUI(this, webUiPort)
//启动JettyServer并绑定webUI端口号
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
//forwardMessageThread线程每1min中检查Worker是否宕了
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
//启动Rest服务,默认端口6066
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}
//返回绑定的端口号
restServerBoundPort = restServer.map(.start())
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
//当metrics系统启动后,将master和app的metrics servlet的hadnler给webui
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
//序列化Spark的配置文件
val serializer = new JavaSerializer(conf)
//支持三种持久化引擎,将Spark的配置参数持久化,便于以后恢复使用
val (persistenceEngine, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}
3.onStop方法,停止master的metrics系统、停止app的metrics系统、取消异步执行的任务、停止WebUi服务、停止rest服务以及持久化引擎和选举代理的停止。
override def onStop() {
masterMetricsSystem.report()
applicationMetricsSystem.report()
//避免异步发出的CompleteRecovery消息导致master的重启
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel(true)
}
if (checkForWorkerTimeOutTask != null) {
checkForWorkerTimeOutTask.cancel(true)
}
forwardMessageThread.shutdownNow()
webUi.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
leaderElectionAgent.stop()
}
还有一个重要的方法receive方法,留到下一篇吧。
【原】Spark中Master源码分析(一)的更多相关文章
-
【原】Spark中Master源码分析(二)
继续上一篇的内容.上一篇的内容为: Spark中Master源码分析(一) http://www.cnblogs.com/yourarebest/p/5312965.html 4.receive方法, ...
-
【原】Spark中Client源码分析(二)
继续前一篇的内容.前一篇内容为: Spark中Client源码分析(一)http://www.cnblogs.com/yourarebest/p/5313006.html DriverClient中的 ...
-
【原】 Spark中Worker源码分析(二)
继续前一篇的内容.前一篇内容为: Spark中Worker源码分析(一)http://www.cnblogs.com/yourarebest/p/5300202.html 4.receive方法, r ...
-
Spark中决策树源码分析
1.Example 使用Spark MLlib中决策树分类器API,训练出一个决策树模型,使用Python开发. """ Decision Tree Classifica ...
-
【原】Spark中Client源码分析(一)
在Spark Standalone中我们所谓的Client,它的任务其实是由AppClient和DriverClient共同完成的.AppClient是一个允许app(Client)和Spark集群通 ...
-
【原】 Spark中Worker源码分析(一)
Worker作为对于Spark集群的健壮运行起着举足轻重的作用,作为Master的奴隶,每15s向Master告诉自己还活着,一旦主人(Master>有了任务(Application),立马交给 ...
-
Spark Scheduler模块源码分析之DAGScheduler
本文主要结合Spark-1.6.0的源码,对Spark中任务调度模块的执行过程进行分析.Spark Application在遇到Action操作时才会真正的提交任务并进行计算.这时Spark会根据Ac ...
-
Spark Scheduler模块源码分析之TaskScheduler和SchedulerBackend
本文是Scheduler模块源码分析的第二篇,第一篇Spark Scheduler模块源码分析之DAGScheduler主要分析了DAGScheduler.本文接下来结合Spark-1.6.0的源码继 ...
-
Spark RPC框架源码分析(三)Spark心跳机制分析
一.Spark心跳概述 前面两节中介绍了Spark RPC的基本知识,以及深入剖析了Spark RPC中一些源码的实现流程. 具体可以看这里: Spark RPC框架源码分析(二)运行时序 Spark ...
随机推荐
-
python 线程之 threading(一)
threading:基于对象和类的较高层面上的接口,threading模块在内部使用_thread模块来实现线程的对象以及常用的同步化工具的功能. 使用定制类的方式继承 threading.Threa ...
-
Android 客户端和服务器 json交互
http://www.cnblogs.com/jyan/articles/2544974.html 1.JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式. ...
-
HDU-4632 http://acm.hdu.edu.cn/showproblem.php?pid=4632
http://acm.hdu.edu.cn/showproblem.php?pid=4632 题意: 一个字符串,有多少个subsequence是回文串. 别人的题解: 用dp[i][j]表示这一段里 ...
-
java新手笔记6 示例for
1.计算天数 /*给定一个年月日,计算是一年的第几天 (如输入:2 15 结果:第46天) */ public class Demo1 { public static void main(String ...
-
uva 11732 - strcmp() Anyone? 不错的Trie题
题解:http://blog.csdn.net/u013480600/article/details/23122503 我的代码一直TLE,,,看了人家的之后,认为1.链式前向星比較好,2.*dept ...
-
HTML5 拖放(Drag 和 Drop)功能开发——基础实战
随着HTML5的普及度越来越高,现在写代码也遇到一些了,经过同事的点播开展了一次Dojo活动用以技术交流,我也乘此机会将HTML5的拖放功能整理了一下. 简介 拖拽(Drag/Drop)是个非常普遍的 ...
-
linux下双网卡的绑定
如果服务器上有两快网卡就可以把它绑定为一块虚拟的网卡,如果一块网卡坏了另一块还可以继续工作,增加了冗余度和负载,具体做法如下: 新建一个虚拟的网卡,命令如下: iv /etc/sysconfig/ne ...
-
JAVA 单步调试快捷键
JAVA 单步调试快捷键以debug方式运行java程序后 (F8)直接执行程序.遇到断点时暂停:(F5)单步执行程序,遇到方法时进入:(F6)单步执行程序,遇到方法时跳过:(F7)单步执行程序,从当 ...
-
Jenkins结合.net平台综合之监听git仓库并自动摘取最新代码编译
前面章节我们讲解了Jenkins结合.net平台工具以及一些第三方工具实现项目自动还原,自动编译,自动测试和自动发布.然而实现自动化还有一个关键的步骤就是监听源码仓库变化然后从仓库拉取最新代码,然后再 ...
-
memcached笔记
启动memcached:./memcached -d -m 10 -l 127.0.0.1 -p 11211 -u root 连接memcached:telnet 127.0.0.1 11211 查看 ...