LauncherBackend 代码记录
概述
//LauncherBackend 是跟LauncherServer通信的客户端,向LauncherServer发送状态变化的通信端点
//LauncherBackend 被org.apache.spark.deploy.yarn.Client 调用.具体细节将会在其他部分做记录。
1.代码记录
private[spark] abstract class LauncherBackend {
private var clientThread: Thread = _
private var connection: BackendConnection = _
private var lastState: SparkAppHandle.State = _
@volatile private var _isConnected = false
//这里进行连接LauncherServer的socket初始化动作,端口是从env中获取的,env里的端口是在SparkLauncher中通告出去的。详情参看SparkLauncher记录
def connect(): Unit = {
val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt) //这里通过环境变量获取LauncherServer的端口号
val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET) //这里通过环境变量获取LauncherServer通信的唯一凭证
if (port != None && secret != None) {
/*
*这里建立跟LauncherServer通信的socket,ip是本地回环地址,
*因为只有通过SparkLauncher的startApplication的方式去提交spark 任务的时候LauncherServer才会在本地回环地址上建立监听
*因为SparkLauncher 通过ProcessBuilder的方式调用spark-submit,所以在spark-submit中会继承父进程的环境变量
*LauncherBackend才能通过环境变量确定是否存在LauncherServer服务
*/
val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
connection = new BackendConnection(s)
connection.send(new Hello(secret.get, SPARK_VERSION))
clientThread = LauncherBackend.threadFactory.newThread(connection)
clientThread.start()
_isConnected = true
}
}
def close(): Unit = {
if (connection != null) {
try {
connection.close()
} finally {
if (clientThread != null) {
clientThread.join()
}
}
}
}
//这里发送appId
def setAppId(appId: String): Unit = {
if (connection != null) {
connection.send(new SetAppId(appId))
}
}
//这里发送app状态变化,具体的调用时机将在其他部分记录
def setState(state: SparkAppHandle.State): Unit = {
if (connection != null && lastState != state) {
connection.send(new SetState(state))
lastState = state
}
}
//下面是一些关闭连接,停止请求的事件函数
/** Return whether the launcher handle is still connected to this backend. */
def isConnected(): Boolean = _isConnected
/**
* Implementations should provide this method, which should try to stop the application
* as gracefully as possible.
*/
protected def onStopRequest(): Unit
/**
* Callback for when the launcher handle disconnects from this backend.
*/
protected def onDisconnected() : Unit = { }
private def fireStopRequest(): Unit = {
val thread = LauncherBackend.threadFactory.newThread(new Runnable() {
override def run(): Unit = Utils.tryLogNonFatalError {
onStopRequest()
}
})
thread.start()
}
private class BackendConnection(s: Socket) extends LauncherConnection(s) {
override protected def handle(m: Message): Unit = m match {
case _: Stop =>
fireStopRequest()
case _ =>
throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}")
}
override def close(): Unit = {
try {
super.close()
} finally {
onDisconnected()
_isConnected = false
}
}
}
}
private object LauncherBackend {
val threadFactory = ThreadUtils.namedThreadFactory("LauncherBackend")
}