继续前一篇的内容。前一篇内容为:
Spark中Client源码分析(一)http://www.cnblogs.com/yourarebest/p/5313006.html
DriverClient中的代码比较简单,它只有一个main函数,同时,和AppClient一样,它也有一个ClientEndpoint,只是两者的用途不一样。
1.Client
Client中唯一的main方法如下:
def main(args: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with "--master spark://host:port"")
}
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
conf.set("spark.akka.logLifecycleEvents", "true")
}
conf.set("spark.rpc.askTimeout", "10")
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)
//创建一个driverClient的Rpc环境,并将得到Master和client的远程引用
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, _, Master.ENDPOINT_NAME))
//clientpoint
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
//启动rpc环境
rpcEnv.awaitTermination()
}
2.ClientEndpoint
ClientEndPoint可以看作给Driver传递消息的代理
属性简单,直接略过。
(1)构造函数为ClientEndPoint主构造函数
(2)onstart方法如下,
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
//driver包装类,使得Worker和Driver的Rpc环境一样,做到共进退
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
//driver类路径
val classPathConf = "spark.driver.extraClassPath"
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
//driver库路径
val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
//driver Jvm参数
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
//将所有的在SparkConf中设置的属性赋值给java options的序列
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
//所有的javaOpts
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
//将以上所有的信息封装在DriverDescription中
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
//异步请求给master发送Driver的信息
ayncSendToMasterAndForwardReplySubmitDriverResponse
case "kill" =>
val driverId = driverArgs.driverId
ayncSendToMasterAndForwardReplyKillDriverResponse
}
}
(3)onstop方法简单,略过。
(4)receive方法如下,
override def receive: PartialFunction[Any, Unit] = {
//收到master的响应回来的Driver信息,因为master是管家,Client是老板
case SubmitDriverResponse(master, success, driverId, message) =>
logInfo(message)
if (success) {
//将当前的activeMasterEndpoint设置为响应消息的master
activeMasterEndpoint = master
//找到driver的信息然后退出JVM
pollAndReportStatus(driverId.get)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}
case KillDriverResponse(master, driverId, success, message) =>
logInfo(message)
if (success) {
activeMasterEndpoint = master
pollAndReportStatus(driverId),详见下①
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}
}
①pollAndReportStatus方法如下,用于找到driver的信息然后退出JVM
def pollAndReportStatus(driverId: String) {
logInfo("... waiting before polling master for driver state")
Thread.sleep(5000)
logInfo("... polling master for driver state")
//master请求得到Driver的信息
val statusResponse =
activeMasterEndpoint.askWithRetryDriverStatusResponse
statusResponse.found match {
case false =>
logError(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
case true =>
logInfo(s"State of $driverId is ${statusResponse.state.get}")
//返回的其实是worker的信息
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
}
statusResponse.exception.map { e =>
logError(s"Exception from cluster was: $e")
e.printStackTrace()
System.exit(-1)
}
System.exit(0)
}
}
【原】Spark中Client源码分析(二)的更多相关文章
-
【原】Spark中Client源码分析(一)
在Spark Standalone中我们所谓的Client,它的任务其实是由AppClient和DriverClient共同完成的.AppClient是一个允许app(Client)和Spark集群通 ...
-
【原】Spark中Master源码分析(二)
继续上一篇的内容.上一篇的内容为: Spark中Master源码分析(一) http://www.cnblogs.com/yourarebest/p/5312965.html 4.receive方法, ...
-
【原】 Spark中Worker源码分析(二)
继续前一篇的内容.前一篇内容为: Spark中Worker源码分析(一)http://www.cnblogs.com/yourarebest/p/5300202.html 4.receive方法, r ...
-
【原】Spark中Master源码分析(一)
Master作为集群的Manager,对于集群的健壮运行发挥着十分重要的作用.下面,我们一起了解一下Master是听从Client(Leader)的号召,如何管理好Worker的吧. 1.家当(静态属 ...
-
Spark中决策树源码分析
1.Example 使用Spark MLlib中决策树分类器API,训练出一个决策树模型,使用Python开发. """ Decision Tree Classifica ...
-
【原】 Spark中Worker源码分析(一)
Worker作为对于Spark集群的健壮运行起着举足轻重的作用,作为Master的奴隶,每15s向Master告诉自己还活着,一旦主人(Master>有了任务(Application),立马交给 ...
-
Spark RPC框架源码分析(二)RPC运行时序
前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...
-
Docker源码分析(二):Docker Client创建与命令执行
1. 前言 如今,Docker作为业界领先的轻量级虚拟化容器管理引擎,给全球开发者提供了一种新颖.便捷的软件集成测试与部署之道.在团队开发软件时,Docker可以提供可复用的运行环境.灵活的资源配置. ...
-
Spark Scheduler模块源码分析之TaskScheduler和SchedulerBackend
本文是Scheduler模块源码分析的第二篇,第一篇Spark Scheduler模块源码分析之DAGScheduler主要分析了DAGScheduler.本文接下来结合Spark-1.6.0的源码继 ...
随机推荐
-
javaScript的call关键字
function add(a, b) { alert(a + b); } function sub(a, b) { alert(a - b); } add.call(sub,3,1); //这个例子中 ...
-
Ruby--学习记录(实时更新)
变量的命名方式决定了变量的种类: 局部变量 以英文字母或者_开头: 全局变量 以$开头: 实例变量 以@开头: 类变量 以@@开头:
-
《ASP.NET SignalR系列》第一课 认识SignalR
从现在开始相关文章请到: http://lko2o.com/moon 一.概述 ASP.NET signalr对ASP.NET开发者来说是一个新的程序库,它能让我们更加容易便捷地开发实时通信功能; s ...
-
PHP中MySql函数收集
1.array mysql_fetch_assoc ( resource $result ) 从结果集中取得一行作为关联数组 说明: 返回对应结果集的关联数组,并且继续移动内部数据指针. 参数:re ...
-
Amoeba:开源的分布式数据库Porxy解决方案
http://www.biaodianfu.com/amoeba.html 什么是Amoeba? Amoeba(变形虫)项目,该开源框架于2008年 开始发布一款 Amoeba for Mysql软件 ...
-
centos -bash-4.1$ 不显示用户名路径
1.在Terminal输入: vi ~/.bash_profile 2.如果没有.bash_profile可以自己添加.然后往文件中添加如下内容: export PS1=’[\u@\h \W]$ 注意 ...
-
【学习笔记】【Foundation】集合Set
不可变集合 NSSet :集合元素无顺序,没有索引号,元素不可重复. NSSet在功能上可看做是NSArray的父集,它是一个更通用的类. NSSet包含如下常用方法: setByAddingObje ...
-
curl 命令详解(转)
命令事例 1.读取网页,将网页内容全部输出 curl http://www.linuxidc.com 2.保存网页.下载文件 以page.html命名下载网页:curl –o page.html ht ...
-
彻底弄懂JS的事件冒泡和事件捕获
先上结论:在事件执行流中有两种执行方式.一种是事件冒泡(即事件的执行顺序是从下往上执行的) ; 另一种是捕获(即事件的执行顺序是从上往下执行的); 阻止事件冒泡: return false; ...
-
NOIP-质因数分解
题目描述 已知正整数n是两个不同的质数的乘积,试求出较大的那个质数. 输入描述: 输入只有一行,包含一个正整数n. 输出描述: 输出只有一行,包含一个正整数p,即较大的那个质数. #include&l ...