3. 创建并初始化Spark UI
任何系统都需要提供监控功能,用浏览器能访问具有样式及布局并提供丰富监控数据的页面无疑是一种简单、高效的方式。SparkUI就是这样的服务。
在大型分布式系统中,采用事件监听机制是最常见的。为什么要使用事件监听机制?假如SparkUI采用Scala的函数调用方式,那么随着整个集群规模的增加,对函数的调用会越来越多,最终会受到Driver所在JVM的线程数量限制而影响监控数据的更新,甚至出现监控数据无法及时显示给用户的情况。由于函数调用多数情况下是同步调用,这就导致线程被阻塞,在分布式环境中,还可能因为网络问题,导致线程被长时间占用。将函数调用更换为发送事件,事件的处理是异步的,当前线程可以继续执行后续逻辑,线程池中的线程还可以被重用,这样整个系统的并发度会大大增加。发送的事件会存入缓存,由定时调度器取出后,分配给监听此事件的监听器对监控数据进行更新。Spark UI的架构如下:
DAGScheduler是主要的产生各类SparkListenerEvent的源头,它将各种SparkListenerEvent发送到listenerBus的事件队列中,listenerBus通过定时器将SparkListenerEvent事件匹配到具体的SparkListener,改变SparkListener中的统计监控数据,最终由SparkUI的界面展示。从上图中可以看到Spark里定义了很多监听器SparkListener的实现,包括JobProgressListener、EnvironmentListener、StorageListener、ExecutorsListener,它们的类继承体系如图:
3.1 listenerBus详解
listenerBus的类型是LiveListenerBus,继承listenerBus类。LiveListenerBus实现了监听器模型,通过监听事件触发对各种监听器监听状态信息的修改,达到UI界面的数据刷新效果。LiveListenerBus由以下部分组成:
- 事件阻塞队列:类型为LinkedBlockingQueue[SparkListenerEvent],固定大小是10000;
- 监听器数组:类型为CopyOnWriteArrayList[SparkListener],存放各类监听器SparkListener。
- 事件匹配监听器的线程:此listenerThread不断拉取LinkedBlockingQueue中的事件,遍历监听器,调用监听器的方法。任何事件都会在LinkedBlockingQueue中存在一段时间,然后listenerThread处理了此事件后,会将其清除。因此使用listenerBus这个名字再合适不过了,到站就下车。listenerBus的实现见代码如下:
LiveListenerBus中调用的postToAll方法实际定义在父类SparkListenerBus的父类ListenerBus中,SparkListenerBus继承ListenerBus,而LiveListenerBus继承SparkListenerBus,ListenerBus的postToAll方法代码如下:
doPostEvent方法在SparkListenerBus中被重写,代码如下:
3.2 构造JobProgressListener
我们以JobProgressListener为例来讲解SparkListener。JobProgressListener是SparkContext中一个重要的组成部分,通过监听listenerBus中的事件更新任务进度。SparkUI实际上也是通过JobProgressListener来实现任务状态跟踪的。创建JobProgressListener的代码如下:
JobProgressListener的作用是通过HashMap、ListBuffer等数据结构存储JobId及对应的JobUIDate信息,并按照激活、完成、失败等job状态统计。对于StageId、StageInfo等信息按照激活、完成、忽略、失败等Stage状态统计,并且存储StageId与JobId的一对多关系。这些统计信息最终会被JobPage和StagePage等页面访问和渲染。JobProgressListener的数据结构见代码如下:
JobProgressListener实现了onJobStart、onJobEnd、onStageCompleted、onStageSubmitted、onTaskStart、onTaskEnd等方法,这些方法正是在listenerBus的驱动下,改变JobProgressListener中的各种Job、Stage相关的数据。
3.3 SparkUI的创建与初始化
SparkUI的创建,见代码:
可以看到如果不需要提供SparkUI服务,可以将属性spark.ui.enabled修改为false。其中createLiveUI实际是调用了create方法,见代码:
create方法的实现见代码:
根据上述代码,可以知道在create方法里除了JobProgressListener是外部出入的之外,又增加了一些SparkListener。例如,用于对JVM参数、Spark属性、Java系统属性、classpath等进行监控的EnvironmentListener;用于维护Executor的存储状态的 StorageStatusListener;用于准备将Executor的信息展示在ExecutorsTab的ExecutorsListener;用于准备将Executor相关存储信息展示在BlockManagerUI的StorageListener等。最后创建SparkUI,SparkUI服务默认是可以被杀掉的,通过修改属性spark.ui.killEnabled为false可以保证不被杀死。initialize方法会组织前端页面各个Tab和Page的展示及布局,代码如下:
3.4 Spark UI的页面布局与展示
SparkUI究竟是如何实现页面布局及展示的?JobsTab展示所有Job的进度、状态信息,这里我们以它为例来说明。JobsTab会复用SparkUI的killEnabled、SparkContext、jobProressListener,包括AllJobsPage和JobPage两个页面,jobsTab的实现见代码:
AllJobsPage由render方法渲染,利用jobProgressListener中的统计监控数据生成激活、完成、失败等状态的Job摘要信息,并调用jobsTable方法生成表格等html元素,最终使用UIUtils的headerSparkPage封装好css、js、header及页面布局等,AllJobsPage的render方法见代码如下:
jobsTable用来生成表格数据,见代码如下:
JobsTab创建之后,将被方法attachTab方法加入SparkUI的ArrayBuffer[WebUITab]中,并且通过attachPage方法,给每一个page生成org.eclipse.jetty.servlet.ServletContextHandler,最后调用attachHandler方法将ServletContextHandler绑定到SparkUI,即加入到handlers:ArrayBuffer[ServletContextHandler]和样例类ServerInfo的rootHandler(ContextHandlerCollection)中。SparkUI继承自WebUI,attachTab方法在WebUI中实现,代码如下:
上述代码所在的类中使用import org.apache.spark.ui.JettyUtils._导入了JettyUtils的静态方法,所以createServletHandler方法实际是JettyUtils的静态方法createServletHandler。createServletHandler实际创建了javax.servlet.http.HttpServlet的匿名内部类实例,此实例实际使用(request: HttpServletRequest)=>page.render(request)函数参数来处理请求,进而渲染页面呈现给用户。
3.5 SparkUI的启动
SparkUI创建好后,需要调用父类WebUI的bind方法,绑定服务和端口,bind方法中主要的代码实现如下:
通过JettyUtils的静态方法startJettyServer启动了Jetty提供的服务,默认端口是4040。
查看:Spark源码剖析——SparkContext的初始化
参考资料:
《深入理解Spark核心思想与源码分析》