在大型分布式系统中,采用事件监听机制是最常见的。如果Spark UI采用Scala的函数调用方式,由于函数调用多数情况下是同步调用,导致线程被阻塞。将函数调用更换为发送事件,事件的处理时异步的,当前线程可以继续执行后续逻辑,线程池中的线程还可以被重用,这样整个系统的并发度会大大增加。发送的事件会存入缓存,由定时调度器取出后,分配给监听此事件的监听器对监控数据进行更新。
DAGScheduler是主要的产生各类SparkListenerEvent的源头,它将各种SparkListenerEvent发送到ListenerBus的事件队列中,ListenerBus通过定时器将SparkListenerEvent事件匹配到具体的SparkListener,改变SparkListener中的统计监控数据,最终由SparkUI的界面展示。
listenerBus详解
listenerBus的类型是LiveListenerBus。LiveListenerBus实现了监听器模型,通过监听事件触发对各种监听器监听状态信息的修改,达到UI界面的数据刷新效果。LiveListenerBus继承自AsynchronousListenerBus,AsynchronousListenerBus由如下部分组成:
·事件阻塞队列:LinkedBlockingQueue[SparkListenerEvent],固定大小是10000;
·监听器数组:类型为ArrayBuffer[SparkListener],存放各类监听器SparkListener;
·事件匹配监听器的线程:此Thread不断拉取LinkedBlockingQueue中的事件,遍历监听器,调用监听器的方法。任何事件都会在LinkedBlockingQueue中存在一段时间,然后Thread处理了此事件后,会将其清除。
构造JobProgressListener
以JobProgressListener为例讲解SparkListener。通过监听ListenerBus中的事件更新任务进度。SparkStatusTracker和SparkUI实际上也是通过JobProgressListener来实现任务状态跟踪的。
JobProgressListener实现了onJobStart、onJobEnd、onStageCompleted、onStageSubmitted、onTaskStart、onTaskEnd等方法,这些方法是在listenerBus的驱动下,改变JobProgressListener中的各种Job、Stage相关的数据。
SparkUI的创建与初始化
private def create(
sc: Option[SparkContext],
conf: SparkConf,
listenerBus: SparkListenerBus,
securityManager: SecurityManager,
appName: String,
basePath: String = "",
jobProgressListener: Option[JobProgressListener] = None,
startTime: Long): SparkUI = {
val _jobProgressListener: JobProgressListener = {
val listener = new JobProgressListener(conf)
(listener)
listener
}
val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener
val executorsListener = new ExecutorsListener(storageStatusListener)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
(environmentListener)
(storageStatusListener)
(executorsListener)
(storageListener)
(operationGraphListener)
new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, _jobProgressListener, storageListener, operationGraphListener,
appName, basePath, startTime)
}
在create方法里除了JobprogressListener是外部传入的之外,又增加了一些SparkListener。例如用于对JVM参数、Spark属性、Java系统属性、classpath等进行监控的EnvironmentListener;用于维护Executor的存储状态的StorageStatusListener;用于准备将Executor的信息展示在ExecutorTab的ExecutorListener;用于准备将Executor相关存储信息展示在BlockManagerUI的storageListener等。最后,创建SparkUI,SparkUI服务默认是可以杀掉 的,设置为false可以保证不被杀死。
Initialize方法会组织前端页面各个Tab和Page的展示及布局。
SparkUI的页面布局与展示
JobsTab展示所有Job的进度,状态信息,以它为例说明。JobsTab会复用SparkUI的killEnabled,SparkContext,jobProgressListener,包括AllJobsPage和JobPage两个页面。
private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
val sc =
val killEnabled =
val jobProgresslistener =
val executorListener =
val operationGraphListener =
def isFairScheduler: Boolean =
(_ == )
attachPage(new AllJobsPage(this))
attachPage(new JobPage(this))
}
AllJobsPage由render方法渲染,利用JobProgressListener中的统计监控数据生成激活、完成、失败等状态的Job摘要信息,并调用jobsTable方法生成表格等html元素,最终使用UIUtils的headerSparkPage封装好css、js、header及页面布局等。
上面的attachPage方法存在于JobsTab的父类WebUITab中,WebUITab维护有ArrayBuffer[WebUIPage]的数据结构,AllJobsPage和JobPage将被放入此ArrayBuffer中。
private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
val pages = ArrayBuffer[WebUIPage]()
val name =
def attachPage(page: WebUIPage) {
= (prefix + "/" + ).stripSuffix("/")
pages += page
}
def headerTabs: Seq[WebUITab] =
def basePath: String =
}
JobsTab创建之后,被attachTab方法加入SparkUI的ArrayBuffer[WebUITab]中,并且通过attachPage方法,将每个page生成,最后调用attachHandler方法将ServletContextHandler绑定到SparkUI。
def attachTab(tab: WebUITab) {
(attachPage)
tabs += tab
}
def attachPage(page: WebUIPage) {
val pagePath = "/" +
val renderHandler = createServletHandler(pagePath,
(request: HttpServletRequest) => (request), securityManager, conf, basePath)
val renderJsonHandler = createServletHandler(("/") + "/json",
(request: HttpServletRequest) => (request), securityManager, conf, basePath)
attachHandler(renderHandler)
attachHandler(renderJsonHandler)
(page, ArrayBuffer[ServletContextHandler]())
.append(renderHandler)
}
def attachHandler(handler: ServletContextHandler) {
handlers += handler
{ info =>
(handler)
if (!) {
()
}
}
}
SparkUI的启动
SparkUI创建好后,需要调用父类WebUI的bind方法,绑定服务和端口。
def bind() {
assert(!, "Attempted to bind %s more than once!".format(className))
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
} catch {
case e: Exception =>
logError("Failed to bind %s".format(className), e)
(1)
}
}
startJettyServer是JettyUtils的静态方法,最终启动Jetty提供的服务。