Spark on Yarn 客户端模式作业提交过程分析
-
我们将以一个Spark Streaming为例,阅读spark相关源码,简述Spark on Yarn客户端模式下作业提交流程。作业是通过spark-submit脚本提交的,因此整个流程从spark-submit代码开始分析。若有错误,希望各位看官指出。
- 通过
submit
获取提交代码的MainClass
- 通过反射机制
Utils.classForName
创建相关的类,并获取其中的mainMethod
- 通过反射调用直接调用上一步获得的
mainMethod
,开始运行作业的main方法 - 首先,新建一个
SparkConf
类,其中封装了Spark和Application相关配置信息。 - 把
SparkConf
和批处理间隔做给参数创建一个StreamingContext
类 - 在对
StreamingContext
初始化的过程中,调用构造器,新建一个SparkContext
类.新建SparkContext
的过程中,有以下步骤需要关注(以下步骤按顺序执行):- JobProgressListener
作业流程监听器,可以获取整个Application运行流程中每个Stage、Job的具体信息。追踪task级别的信息,用作在UI上的展示。 - createSparkEnv
以SparkConf
和listenerBus
为参数调用createSparkEnv
函数。其中,listenerBus
是spark中的监听器,包括JobProgressListener
。在createSparkEnv
调用的过程中,将调用SparkEnv
对象的createDriverEnv
成员函数,在这个过程中会创建一个actorSystem
和一个rpcEnv
,生成一个driver
,这将创建一个SparkEnv
对象,SparkEnv
对象中将封装诸如rpcEnv
,actorSystem
,cacheManager
,mapOutputTracker
,shuffleManager
,broadcastManager
,blockManager
,memoryManager
等成员类,成员类的作用如下:- mapOutputTracker
跟踪一个stage map、output的位置。获取map、output的信息。driver、executor使用不同的HashMap存储元数据。 - shuffleManager
shuffleManager
会在driver和每个executor中创建,我们可以通过spark.shuffle.manager
来对shuffle进行配置,executor可以同过shuffleManager
接口读写数据。 - broadcastManager
广播变量管理器 - blockManager
外部类与storage模块打交道都要通过调用BlockManager
相应接口实现 - memoryManager
内存管理器,协调运行内存和存储内存,其中运行资源负责shuffles、 joins、sorts和aggregations,存储内存负责caching和扩散。每一个executor都有一个memoryManager
- mapOutputTracker
- SparkStatusTracker
低级API,SparkStatusTracker
类方法将调用JobProgressListener
类中的成员变量,SparkStatusTracker
可以获得Application中Stage、Job的具体信息,但只提供最近几个Jobs/Stages信息。 - HeartbeatReceiver
运行在driver
上的一个类,负责接受来自executor
的心跳信息。 - createTaskScheduler
调用createTaskScheduler
,返回两个对象:_schedulerBackend
、_taskScheduler
,并创建_dagScheduler
。DAGScheduler
初始化完成之后,将调用_taskScheduler.start()
,这一步主要进行了:- 新建一个
ClientArguements
类,封装一些Application中需要的资源相关的配置信息。 - 以
ClientArguements
为参数,新建一个Client
类 - 调用
Client.submitApplication
- 调用hadoop-yarn接口初始化
yarnClient
,从集群上申请一个Application,获取Application id,判断集群是否有足够资源,否则中断。向yarn集群申请一个Container运行ApplicationMaster
,最后把整个Application提交到Yarn集群上运行。
- 新建一个
- JobProgressListener
- 通过
-
Reference
https://github.com/apache/spark/tree/branch-1.4
http://spark.apache.org/docs/1.4.1/running-on-yarn.html