LauncherBackend记录(spark-2.2.0)

时间:2022-10-19 20:51:02

LauncherBackend 代码记录

概述

//LauncherBackend 是跟LauncherServer通信的客户端,向LauncherServer发送状态变化的通信端点
//LauncherBackend 被org.apache.spark.deploy.yarn.Client 调用.具体细节将会在其他部分做记录。
1.代码记录
private[spark] abstract class LauncherBackend {

  private var clientThread: Thread = _
  private var connection: BackendConnection = _
  private var lastState: SparkAppHandle.State = _
  @volatile private var _isConnected = false

  //这里进行连接LauncherServer的socket初始化动作,端口是从env中获取的,env里的端口是在SparkLauncher中通告出去的。详情参看SparkLauncher记录
  def connect(): Unit = {
    val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt)  //这里通过环境变量获取LauncherServer的端口号
    val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET)            //这里通过环境变量获取LauncherServer通信的唯一凭证
    if (port != None && secret != None) {
      /*
        *这里建立跟LauncherServer通信的socket,ip是本地回环地址,
        *因为只有通过SparkLauncher的startApplication的方式去提交spark 任务的时候LauncherServer才会在本地回环地址上建立监听
        *因为SparkLauncher 通过ProcessBuilder的方式调用spark-submit,所以在spark-submit中会继承父进程的环境变量
        *LauncherBackend才能通过环境变量确定是否存在LauncherServer服务
        */
      val s = new Socket(InetAddress.getLoopbackAddress(), port.get)     
      connection = new BackendConnection(s)
      connection.send(new Hello(secret.get, SPARK_VERSION))
      clientThread = LauncherBackend.threadFactory.newThread(connection)
      clientThread.start()
      _isConnected = true
    }
  }

  def close(): Unit = {
    if (connection != null) {
      try {
        connection.close()
      } finally {
        if (clientThread != null) {
          clientThread.join()
        }
      }
    }
  }

  //这里发送appId
  def setAppId(appId: String): Unit = {
    if (connection != null) {
      connection.send(new SetAppId(appId))
    }
  }

  //这里发送app状态变化,具体的调用时机将在其他部分记录
  def setState(state: SparkAppHandle.State): Unit = {
    if (connection != null && lastState != state) {
      connection.send(new SetState(state))
      lastState = state
    }
  }

  //下面是一些关闭连接,停止请求的事件函数

  /** Return whether the launcher handle is still connected to this backend. */
  def isConnected(): Boolean = _isConnected

  /**
   * Implementations should provide this method, which should try to stop the application
   * as gracefully as possible.
   */
  protected def onStopRequest(): Unit

  /**
   * Callback for when the launcher handle disconnects from this backend.
   */
  protected def onDisconnected() : Unit = { }

  private def fireStopRequest(): Unit = {
    val thread = LauncherBackend.threadFactory.newThread(new Runnable() {
      override def run(): Unit = Utils.tryLogNonFatalError {
        onStopRequest()
      }
    })
    thread.start()
  }

  private class BackendConnection(s: Socket) extends LauncherConnection(s) {

    override protected def handle(m: Message): Unit = m match {
      case _: Stop =>
        fireStopRequest()

      case _ =>
        throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}")
    }

    override def close(): Unit = {
      try {
        super.close()
      } finally {
        onDisconnected()
        _isConnected = false
      }
    }

  }

}

private object LauncherBackend {

  val threadFactory = ThreadUtils.namedThreadFactory("LauncherBackend")

}