zeppelin源码初探
主要看整个运行的流程。
1.组件部分
Paragraph代表着一段代码以及支撑其执行所需要的“环境信息”,是代码执行的最小单位。Paragraph的职责如下:
1. 获取代码文本,并解析分离类似%Spark的interpreter声明段和可执行代码段。
2. 代码执行,以及执行过程控制(进度和终止)
3. 代码执行结果获取
4. 代码中变量查找以及替换
Note是单个’记事本’的内存对象,是zeppelin管理的最小单位,无论是做权限控制、共享、还是持久化,都是以Note为粒度的。从类关系上看,Note是由一些列的有序Paragraph组成,因此其绝大部分职责都是与管理Paragraph有关:
Notebook实际上是Note的Manager,职责如下:
1. Note的CRUD,克隆、导入/导出
2. Note和相关Interpreter配置时和运行时映射关系维护
3. Note cron式调度执行控制
NotebookServer的主要功能是将Notebook、Note、Paragraph、Interpreter等类封装的能力,通过WebSocket的形式对web 客户端提供出去,所以其具体的职责包括:
1. 维护WebSocket连接与Note之间映射关系
2. 处理客户端和服务器之间的双向通信(通过WebSocket,具体的通信协议见:Message类),包括消息的序列化/反序列化,消息解析和服务端处理、处理结果的向客户端广播/单播发送等。
3. Note的CRUD操作以及Paragraph的CRUD操作、执行、导入、导出时的权限控制
4. 前后端AngularObject的双向bind处理
5. WebSocket客户端合法性校验(checkOrigin)
ZeppelinServer是各个组件的”组装者”,它是系统的主入口,职责如下:
1. 内嵌jetty服务器,支持以WebSocket和REST两种方式对外暴露系统功能
2. 创建NotebookServer实例,建立起处理WebSocket Connection和消息处理的服务端
3. 创建Notebook需要的相关依赖,如Note持久化服务(NotebookRepo)、Note的全文索引服务(SearchService),并完成向Note、Paragraph的注入。
4. Note权限配置文件的加载以及初始化
5. InterpreterFactory的初始化
6. 初始化动态依赖加载器(DependencyResolver)
2.运行部分
-
当我们启动zeppelin-daemon.sh start时:
- 这个脚本里面定义了这样一个ZEPPELIN_MAIN=org.apache.zeppelin.server.ZeppelinServer变量。而ZeppelinServer是各个组件的”组装者”,它是系统的主入口
- 在ZeppelinServer中启动了一系列的服务,包括web
NotebookServer是一个webSocket,在他的onMessage方法中,根据不同的消息类型,调用不同的方法
当我们运行一个mysql的select语句时,onMessage收到的消息是:消息是一个json的字符串,op即为我们的操作(运行这个代码块),data包含了一些信息。
{
"op": "RUN_PARAGRAPH",
"data": {
"id": "20170721-094746_2107963913",
"paragraph": " paragraph=%mysql\nuseshixun2;\nselect*fromremindlimit10;",
"config": {
"colWidth": 12,
"enabled": true,
"results": {},
"editorSetting": {
"language": "sql",
"editOnDblClick": false
},
"editorMode": "ace/mode/sql"
},
"params": {}
},
"principal": "admin",
"ticket": "320ef21b-4f40-4779-8f09-ef9895de98d7",
"roles": "[admin]"
}- 接下来将json字符串解析成一个Message,并依据op进行操作
Message messagereceived = deserializeMessage(msg);
......
case RUN_PARAGRAPH:
runParagraph(conn, userAndRoles, notebook, messagereceived);
break;
case RUN_ALL_PARAGRAPHS:
runAllParagraphs(conn, userAndRoles, notebook, messagereceived);
break;- 在runParagraph中,会将信息再次进行封装成一个paragraph.
-
runParagraph:
- Message里面包含paragraphId,得到
- 得到note,Note是由一些列的有序Paragraph组成
- 调用Note.run方法里面得到解释器
-
执行流程(解释run的流程):
NotebookServer.java中根据消息的op调用runParagraph,它调用Note的run方法,Note的run方法得到解释器的scheduler,提交任务。RemoteScheduler将放入job队列,唤醒执行线程。具体的执行下面会继续描述。NotebookServer.java
runParagraph(conn, userAndRoles, notebook, messagereceived);
persistAndExecuteSingleParagraph(......){
note.run(p.getId());
}
Note.java
public void run(String paragraphId){
Paragraph p = getParagraph(paragraphId);
p.setListener(jobListenerFactory.getParagraphJobListener(this));
String requiredReplName = p.getRequiredReplName();
//这个replName在传过来的json数据里面包含了,如果我们没有加%mysql,即replName为空,那么getInterpreter会加载默认的解释器
Interpreter intp = factory.getInterpreter(p.getUser(), getId(), requiredReplName);
......
intp.getScheduler().submit(p);
}
RemoteScheduler.java
public void submit(Job job) {
if (terminate) {
throw new RuntimeException("Scheduler already terminated");
}
job.setStatus(Status.PENDING);
synchronized (queue) {
queue.add(job);
queue.notify();
}
}
public void run() {
......
queue.wait(500);
}intp.getScheduler().submit(p);提交的是一个paragraph,而paragraph继承Job,实现了jobRun()方法
queue添加了一个job后,notify唤醒了一个线程,这应该是一个执行线程。在run方法中,执行一个job后,有queue.wait(500)操作。
那么这个run实际应该是paragraph的run方法。在paragraph的run方法如下:
这个反方法首先根据replName得到了对应的解释器,并调用了解释器interpret方法,这个方法是具体的执行过程,最后返回运行结果。
从这来看,所有的运行结果都是返回给了server端,并不是直接返回给客户端protected Object jobRun() throws Throwable {
String replName = getRequiredReplName();
Interpreter repl = getRepl(replName);
......
try {
InterpreterContext context = getInterpreterContext();
InterpreterContext.set(context);
InterpreterResult ret = repl.interpret(script, context);
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
resultMessages.addAll(ret.message());
InterpreterResult res = new InterpreterResult(ret.code(), resultMessages);
......
return res;
} -
解释器(以jdbc为例):上面已经提到,interpret方法在paragraph.run方法中被调用,并将结果返回
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
......
return executeSql(propertyKey, cmd, contextInterpreter);
}
private InterpreterResult executeSql(String propertyKey, String sql,
InterpreterContext interpreterContext) {
......
}cmd=”use shixun2;\nselect * from remind limit 10;”
在executeSql方法中: sql为要执行的代码。最后返回的是这个InterpreterResult包含了查询的结果,即interpret返回了最终得结果给了server端。 -
得到解释器:
一个note有一系列的解释器的对应关系存储在interpreterFactory中。如果我们的paragraph开始时加了 %解释器名,那么这个replName会同正文一起发送到server端,paragraph在通过factory getInterpreter的时候会根据这个replName来加载对应的解释器。相反,如果我们没有加 %解释器名,那么replName为空,从下面的代码我们可以看出默认的解释器是此时这个note对应的一系列解释器的第一个,即settings.get(0)。其实我们从UI界面上也可以看出累,default解释器位于第一位。InterpreterFactory.java
if (replName == null || replName.trim().length() == 0) {
// get default settings (first available)
// TODO(jl): Fix it in case of returning null
InterpreterSetting defaultSettings = interpreterSettingManager
.getDefaultInterpreterSetting(settings);
return createOrGetInterpreterList(user, noteId, defaultSettings).get(0);
}
InterpreterSettingManager.java
public InterpreterSetting getDefaultInterpreterSetting(List<InterpreterSetting> settings) {
if (settings == null || settings.isEmpty()) {
return null;
}
return settings.get(0);
}那么这些的解释器又是何时形成的呢?
在InterpreterFactory.java 有这样一个属性InterpreterSettingManager interpreterSettingManager,在创建这个factory的时候他就被初始化了
在InterpreterSettingManager.java 中有这样的属性:
Map < String, List> interpreterBindings:key为noteID,value为interpreterID的集合。
Map < String, InterpreterSetting> interpreterSettings:key为interpreterID,value为InterpreterSetting同时在InterpreterSettingManager构造函数中调用了init这个方法,在init中调用了loadFromFile这个方法,在这个方法中去读取了文件,应该在这个文件中会加载所有的解释器。但是我现在debug不到InterpreterFactory和InterpreterSettingManager 的构造方法,也进不到loadFromFile这个方法内,所以对于实际加载哪一个文件还不是很清楚。