Spark on Yarn客户端作业提交过程分析

时间:2022-09-09 08:34:43

Spark on Yarn 客户端模式作业提交过程分析

https://www.zybuluo.com/rickyChen/note/312098


  1. 我们将以一个Spark Streaming为例,阅读spark相关源码,简述Spark on Yarn客户端模式下作业提交流程。作业是通过spark-submit脚本提交的,因此整个流程从spark-submit代码开始分析。若有错误,希望各位看官指出。

    1. 通过submit获取提交代码的MainClass
    2. 通过反射机制Utils.classForName创建相关的类,并获取其中的mainMethod
    3. 通过反射调用直接调用上一步获得的mainMethod,开始运行作业的main方法
    4. 首先,新建一个SparkConf类,其中封装了Spark和Application相关配置信息。
    5. SparkConf和批处理间隔做给参数创建一个StreamingContext
    6. 在对StreamingContext初始化的过程中,调用构造器,新建一个SparkContext类.新建SparkContext的过程中,有以下步骤需要关注(以下步骤按顺序执行):
      • JobProgressListener
        作业流程监听器,可以获取整个Application运行流程中每个Stage、Job的具体信息。追踪task级别的信息,用作在UI上的展示。
      • createSparkEnv
        SparkConflistenerBus为参数调用createSparkEnv函数。其中,listenerBus是spark中的监听器,包括JobProgressListener。在createSparkEnv调用的过程中,将调用SparkEnv对象的createDriverEnv成员函数,在这个过程中会创建一个actorSystem和一个rpcEnv,生成一个driver,这将创建一个SparkEnv对象,SparkEnv对象中将封装诸如rpcEnvactorSystemcacheManagermapOutputTrackershuffleManagerbroadcastManagerblockManagermemoryManager 等成员类,成员类的作用如下:
        • mapOutputTracker
          跟踪一个stage map、output的位置。获取map、output的信息。driver、executor使用不同的HashMap存储元数据。
        • shuffleManager
          shuffleManager会在driver和每个executor中创建,我们可以通过spark.shuffle.manager来对shuffle进行配置,executor可以同过shuffleManager接口读写数据。
        • broadcastManager
          广播变量管理器
        • blockManager
          外部类与storage模块打交道都要通过调用BlockManager相应接口实现
        • memoryManager
          内存管理器,协调运行内存和存储内存,其中运行资源负责shuffles、 joins、sorts和aggregations,存储内存负责caching和扩散。每一个executor都有一个memoryManager
      • SparkStatusTracker
        低级API,SparkStatusTracker类方法将调用JobProgressListener类中的成员变量,SparkStatusTracker可以获得Application中Stage、Job的具体信息,但只提供最近几个Jobs/Stages信息。
      • HeartbeatReceiver
        运行在driver上的一个类,负责接受来自executor的心跳信息。
      • createTaskScheduler
        调用createTaskScheduler,返回两个对象:_schedulerBackend_taskScheduler,并创建_dagSchedulerDAGScheduler初始化完成之后,将调用_taskScheduler.start(),这一步主要进行了:
        • 新建一个ClientArguements类,封装一些Application中需要的资源相关的配置信息。
        • ClientArguements为参数,新建一个Client
        • 调用Client.submitApplication
        • 调用hadoop-yarn接口初始化yarnClient,从集群上申请一个Application,获取Application id,判断集群是否有足够资源,否则中断。向yarn集群申请一个Container运行ApplicationMaster,最后把整个Application提交到Yarn集群上运行。
  2. Reference

    https://github.com/apache/spark/tree/branch-1.4
    http://spark.apache.org/docs/1.4.1/running-on-yarn.html