spark源码阅读--SparkContext启动过程

时间:2024-08-06 12:06:14

##SparkContext启动过程
基于spark 2.1.0  scala 2.11.8
spark源码的体系结构实在是很庞大,从使用spark-submit脚本提交任务,到向yarn申请容器,启动driver进程,启动executor进程,到任务调度,shuffle过程等等,模块众多,而且每个模块都很大,所以要全部看完啃透几乎不可能,一是经历不允许,而是有些边缘性的模块主要起到辅助的功能,没有什么高深的技术含量,花时间性价比不高。因此我决定略去前面提交任务,向yarn提交任务,申请资源,启动容器的部分,以用户代码的运行为切入点,从SparkContext开始看起,当然spark从1.6之后一直以SparkSession作为用户编程的主要api,但是SparkSession实际仅仅是对SparkContext,SQLContext等入口对象的进一步分装,而涉及到spark核心模块的还是SparkContext。

###SparkContext启动流程
启动流程在SparkContext的初始化块中,在scala类中,可以直接在类作用域内执行一些代码块,这些代码块的作用就相当于java中类的实例初始化块,在实例初始化时被调用,因此一般会有一些初始化的逻辑在这里。我们看一下SparkContext类,除了前面几个内部常量成员的初始化,之后就是这段try块的执行了。感觉这点也是scala语法的一个缺点,java中借助IDE可以很快找到类的初始化代码,而iDEA对scala类的这种代码块并没有特别的突出,因此需要看代码的时候很仔细,否则很有可能就遗漏了一些重要的代码。

try {
    // 引用克隆,与外部传进来的conf隔离
    _conf = config.clone()
    _conf.validateSettings()

if (!_conf.contains("spark.master")) {
      throw new SparkException("A master URL must be set in your configuration")
    }
    if (!_conf.contains("spark.app.name")) {
      throw new SparkException("An application name must be set in your configuration")
    }

// log out spark.app.name in the Spark driver logs
    logInfo(s"Submitted application: $appName")

// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
    if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
      throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
        "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
    }

if (_conf.getBoolean("spark.logConf", false)) {
      logInfo("Spark configuration:\n" + _conf.toDebugString)
    }

// Set Spark driver host and port system properties. This explicitly sets the configuration
    // instead of relying on the default value of the config constant.
    _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
    _conf.setIfMissing("spark.driver.port", "0")

_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

_jars = Utils.getUserJars(_conf)
    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
      .toSeq.flatten

_eventLogDir =
      if (isEventLogEnabled) {
        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
          .stripSuffix("/")
        Some(Utils.resolveURI(unresolvedDir))
      } else {
        None
      }

_eventLogCodec = {
      val compress = _conf.getBoolean("spark.eventLog.compress", false)
      if (compress && isEventLogEnabled) {
        Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
      } else {
        None
      }
    }

// 事件总线
    _listenerBus = new LiveListenerBus(_conf)

// Initialize the app status store and listener before SparkEnv is created so that it gets
    // all events.
    // 应用状态存储, 在SparkEnv创建之前将监听器加到事件注册到事件总线中,以能够监听到SparkEnv的事件
    _statusStore = AppStatusStore.createLiveStore(conf)
    listenerBus.addToStatusQueue(_statusStore.listener.get)

// Create the Spark execution environment (cache, map output tracker, etc)
    // 创建SparkEnv,是spark的重要的基础设施
    _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)

// If running the REPL, register the repl's output dir with the file server.
    _conf.getOption("spark.repl.class.outputDir").foreach { path =>
      val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
      _conf.set("spark.repl.class.uri", replUri)
    }

_statusTracker = new SparkStatusTracker(this, _statusStore)

_progressBar =
      if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
        Some(new ConsoleProgressBar(this))
      } else {
        None
      }

_ui =
      if (conf.getBoolean("spark.ui.enabled", true)) {
        Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
          startTime))
      } else {
        // For tests, do not enable the UI
        None
      }
    // Bind the UI before starting the task scheduler to communicate
    // the bound port to the cluster manager properly
    // 启动spark ui
    _ui.foreach(_.bind())

// 创建hadoop config配置,以spark.hadoop开头的配置
    _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

// Add each JAR given through the constructor
    // 添加spark.jars参数指定的文件
    if (jars != null) {
      jars.foreach(addJar)
    }

// 添加spark.files参数指定的文件
    if (files != null) {
      files.foreach(addFile)
    }

// 可以看出参数的取值顺序
    _executorMemory = _conf.getOption("spark.executor.memory")
      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
      .orElse(Option(System.getenv("SPARK_MEM"))
        .map(warnSparkMem))
      .map(Utils.memoryStringToMb)
      .getOrElse(1024)

// Convert java options to env vars as a work around
    // since we can't set env vars directly in sbt.
    for {(envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
         value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
      executorEnvs(envKey) = value
    }
    Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
      executorEnvs("SPARK_PREPEND_CLASSES") = v
    }
    // The Mesos scheduler backend relies on this environment variable to set executor memory.
    // TODO: Set this only in the Mesos scheduler.
    executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
    executorEnvs ++= _conf.getExecutorEnv
    executorEnvs("SPARK_USER") = sparkUser

// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
    // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
    // 必须要在创建TaskScheduler之前先注册HeartbeatReceiver,
    // 因为Executor的构造方法参数中需要HeartbeatReceiver
    _heartbeatReceiver = env.rpcEnv.setupEndpoint(
      HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

// Create and start the scheduler
    // 创建并启动调度器,包括调度后端,任务调度器
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    // 创建DAG调度器
    _dagScheduler = new DAGScheduler(this)
    // 端点引用,类似于RPC中的stup,调用端
    // 发送一个
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()

_applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()
    _conf.set("spark.app.id", _applicationId)
    if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
      System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
    }
    _ui.foreach(_.setAppId(_applicationId))
    _env.blockManager.initialize(_applicationId)

// The metrics system for Driver need to be set spark.app.id to app ID.
    // So it should start after we get app ID from the task scheduler and set spark.app.id.
    _env.metricsSystem.start()
    // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
    _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

// 事件记录器,监听事件总线上的事件,将事件记录到日志中
    _eventLogger =
      if (isEventLogEnabled) {
        val logger =
          new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
            _conf, _hadoopConfiguration)
        logger.start()
        listenerBus.addToEventLogQueue(logger)
        Some(logger)
      } else {
        None
      }

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
    // 动态申请executor
    val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
    _executorAllocationManager =
      if (dynamicAllocationEnabled) {
        schedulerBackend match {
          case b: ExecutorAllocationClient =>
            Some(new ExecutorAllocationManager(
              schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
          case _ =>
            None
        }
      } else {
        None
      }
    _executorAllocationManager.foreach(_.start())

// 清理器,用于清理RDD,shuffle,广播变量等的状态
    _cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())

// 添加通过spark.extraListeners参数设置的监听器,然后启动事件总线
    setupAndStartListenerBus()
    // 投递一个环境更新的事件到事件总线中
    postEnvironmentUpdate()
    // 投递一个应用启动的事件到事件总线中
    postApplicationStart()

// Post init
    // 调用任务调度器的前置方法
    _taskScheduler.postStartHook()
    // 添加对DAG调度器的度量源,主要用于获取任务数,运行的任务数,成功,失败,运行的stage数目
    _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
    // 添加对块管理器的度量源,主要用于统计内存使用量
    _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
    // 添加对executor申请管理器的度量源,主要用于统计申请的executor的统计信息
    _executorAllocationManager.foreach { e =>
      _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
    }

// Make sure the context is stopped if the user forgets about it. This avoids leaving
    // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
    // is killed, though.
    logDebug("Adding shutdown hook") // force eager creation of logger
    // 添加关闭SparkContext的钩子
    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
      ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
      logInfo("Invoking stop() from shutdown hook")
      stop()
    }
  } catch {
    case NonFatal(e) =>
      logError("Error initializing SparkContext.", e)
      try {
        stop()
      } catch {
        case NonFatal(inner) =>
          logError("Error stopping SparkContext after init error.", inner)
      } finally {
        throw e
      }
  }

总结一下这段代码的主要逻辑:

* 处理配置参数
* ***创建事件总线LiveListenerBus,用于发布事件,监听事件***
* 创建程序状态存储器AppStatusStore,是一个KV存储的包装类。并且将与此存储器关联的监听器添加到事件总线的appStatus队列中,监听appStatus类型的事件
* ***创建SparkEnv对象,这个对象是spark的执行环境,是spark中最重要的类之一,内部分装了块管理器,shuffle管理器,map输出跟踪器,广播管理器,内存管理器等重要的基础设施,是spark运行的基石。***
* 创建状态跟踪器,用于跟踪job和stage的执行情况,
* 创建控制台进度条,用于往控制台打印stage运行进度信息
* 创建ui对象,用于提供web页面访问服务
* 将spark.jars和spark.files添加到NettyStreamManager中以提供文件下载服务,executor会通过rpc下载这些文件
* 设置executor的一些环境变量,
* 创建心跳接收器,并创建一个端点引用
* ***创建调度后端和任务调度器,以常用的yarn cluster模式为例,创建的是YarnClusterScheduler和YarnClusterSchedulerBackend***
* ***创建DAG调度器,dag调度器运行任务是通过向任务调度器提交任务实现的***
* 启动任务调度器,内部启动了调度后端
* 初始化块管理器,启动度量系统,
* 创建动态申请资源的管理器,如果没有启用动态资源,那么这一项为空
* 创建清理器,用于清理RDD,shuffle,广播变量等的状态
* 向度量系统注册三个度量源,分别是DAG调度器度量源,块管理器度量源,动态资源申请管理器度量源
* 最后添加一个程序退出的钩子函数,用于在程序退出时关闭SparkContext

其中斜体加粗的几个模块是重点模块。后面会从一次RDD行动算子运行为切入点,分析在一次完整的spark任务运行过程中涉及到那些模块,以及各个模块之间是如何协同配合,在这个过程中逐步弄清楚每个模块的作用以及内部运行机制,任重而道远啊!!