zeppelin源码分析——server端

时间:2021-04-28 20:46:00

zeppelin源码初探

主要看整个运行的流程。

1.组件部分

Paragraph代表着一段代码以及支撑其执行所需要的“环境信息”,是代码执行的最小单位。Paragraph的职责如下:
1. 获取代码文本,并解析分离类似%Spark的interpreter声明段和可执行代码段。
2. 代码执行,以及执行过程控制(进度和终止)
3. 代码执行结果获取
4. 代码中变量查找以及替换

Note是单个’记事本’的内存对象,是zeppelin管理的最小单位,无论是做权限控制、共享、还是持久化,都是以Note为粒度的。从类关系上看,Note是由一些列的有序Paragraph组成,因此其绝大部分职责都是与管理Paragraph有关:

Notebook实际上是Note的Manager,职责如下:
1. Note的CRUD,克隆、导入/导出
2. Note和相关Interpreter配置时和运行时映射关系维护
3. Note cron式调度执行控制

NotebookServer的主要功能是将Notebook、Note、Paragraph、Interpreter等类封装的能力,通过WebSocket的形式对web 客户端提供出去,所以其具体的职责包括:
1. 维护WebSocket连接与Note之间映射关系
2. 处理客户端和服务器之间的双向通信(通过WebSocket,具体的通信协议见:Message类),包括消息的序列化/反序列化,消息解析和服务端处理、处理结果的向客户端广播/单播发送等。
3. Note的CRUD操作以及Paragraph的CRUD操作、执行、导入、导出时的权限控制
4. 前后端AngularObject的双向bind处理
5. WebSocket客户端合法性校验(checkOrigin)

ZeppelinServer是各个组件的”组装者”,它是系统的主入口,职责如下:
1. 内嵌jetty服务器,支持以WebSocket和REST两种方式对外暴露系统功能
2. 创建NotebookServer实例,建立起处理WebSocket Connection和消息处理的服务端
3. 创建Notebook需要的相关依赖,如Note持久化服务(NotebookRepo)、Note的全文索引服务(SearchService),并完成向Note、Paragraph的注入。
4. Note权限配置文件的加载以及初始化
5. InterpreterFactory的初始化
6. 初始化动态依赖加载器(DependencyResolver)

2.运行部分

  1. 当我们启动zeppelin-daemon.sh start时:

    • 这个脚本里面定义了这样一个ZEPPELIN_MAIN=org.apache.zeppelin.server.ZeppelinServer变量。而ZeppelinServer是各个组件的”组装者”,它是系统的主入口
    • 在ZeppelinServer中启动了一系列的服务,包括web
    • NotebookServer是一个webSocket,在他的onMessage方法中,根据不同的消息类型,调用不同的方法

    • 当我们运行一个mysql的select语句时,onMessage收到的消息是:消息是一个json的字符串,op即为我们的操作(运行这个代码块),data包含了一些信息。

    {
    "op": "RUN_PARAGRAPH",
    "data": {
    "id": "20170721-094746_2107963913",
    "paragraph": " paragraph=%mysql\nuseshixun2;\nselect*fromremindlimit10;",
    "config": {
    "colWidth": 12,
    "enabled": true,
    "results": {},
    "editorSetting": {
    "language": "sql",
    "editOnDblClick": false
    }
    ,
    "editorMode": "ace/mode/sql"
    }
    ,
    "params": {}
    }
    ,
    "principal": "admin",
    "ticket": "320ef21b-4f40-4779-8f09-ef9895de98d7",
    "roles": "[admin]"
    }
    • 接下来将json字符串解析成一个Message,并依据op进行操作
     Message messagereceived = deserializeMessage(msg);

    ......
    case RUN_PARAGRAPH:
    runParagraph(conn, userAndRoles, notebook, messagereceived);
    break;
    case RUN_ALL_PARAGRAPHS:
    runAllParagraphs(conn, userAndRoles, notebook, messagereceived);
    break;
    • 在runParagraph中,会将信息再次进行封装成一个paragraph.
  2. runParagraph:

    • Message里面包含paragraphId,得到
    • 得到note,Note是由一些列的有序Paragraph组成
    • 调用Note.run方法里面得到解释器
  3. 执行流程(解释run的流程):
    NotebookServer.java中根据消息的op调用runParagraph,它调用Note的run方法,Note的run方法得到解释器的scheduler,提交任务。RemoteScheduler将放入job队列,唤醒执行线程。具体的执行下面会继续描述。

    NotebookServer.java
    runParagraph(conn, userAndRoles, notebook, messagereceived);
    persistAndExecuteSingleParagraph(......){
    note.run(p.getId());
    }

    Note.java
    public void run(String paragraphId){
    Paragraph p = getParagraph(paragraphId);
    p.setListener(jobListenerFactory.getParagraphJobListener(this));

    String requiredReplName = p.getRequiredReplName();
    //这个replName在传过来的json数据里面包含了,如果我们没有加%mysql,即replName为空,那么getInterpreter会加载默认的解释器
    Interpreter intp = factory.getInterpreter(p.getUser(), getId(), requiredReplName);

    ......
    intp.getScheduler().submit(p);

    }

    RemoteScheduler.java
    public void submit(Job job) {
    if (terminate) {
    throw new RuntimeException("Scheduler already terminated");
    }
    job.setStatus(Status.PENDING);

    synchronized (queue) {
    queue.add(job);
    queue.notify();
    }
    }

    public void run() {
    ......
    queue.wait(500);
    }

    intp.getScheduler().submit(p);提交的是一个paragraph,而paragraph继承Job,实现了jobRun()方法
    queue添加了一个job后,notify唤醒了一个线程,这应该是一个执行线程。在run方法中,执行一个job后,有queue.wait(500)操作。
    那么这个run实际应该是paragraph的run方法。在paragraph的run方法如下:
    这个反方法首先根据replName得到了对应的解释器,并调用了解释器interpret方法,这个方法是具体的执行过程,最后返回运行结果。
    从这来看,所有的运行结果都是返回给了server端,并不是直接返回给客户端

    protected Object jobRun() throws Throwable {
    String replName = getRequiredReplName();
    Interpreter repl = getRepl(replName);
    ......
    try {
    InterpreterContext context = getInterpreterContext();
    InterpreterContext.set(context);
    InterpreterResult ret = repl.interpret(script, context);
    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
    resultMessages.addAll(ret.message());

    InterpreterResult res = new InterpreterResult(ret.code(), resultMessages);

    ......
    return res;

    }
  4. 解释器(以jdbc为例):上面已经提到,interpret方法在paragraph.run方法中被调用,并将结果返回

    public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
    ......
    return executeSql(propertyKey, cmd, contextInterpreter);
    }

    private InterpreterResult executeSql(String propertyKey, String sql,
    InterpreterContext interpreterContext) {
    ......
    }

    cmd=”use shixun2;\nselect * from remind limit 10;”
    在executeSql方法中: sql为要执行的代码。最后返回的是这个InterpreterResult包含了查询的结果,即interpret返回了最终得结果给了server端。

  5. 得到解释器:
    一个note有一系列的解释器的对应关系存储在interpreterFactory中。如果我们的paragraph开始时加了 %解释器名,那么这个replName会同正文一起发送到server端,paragraph在通过factory getInterpreter的时候会根据这个replName来加载对应的解释器。相反,如果我们没有加 %解释器名,那么replName为空,从下面的代码我们可以看出默认的解释器是此时这个note对应的一系列解释器的第一个,即settings.get(0)。其实我们从UI界面上也可以看出累,default解释器位于第一位。

    InterpreterFactory.java
    if (replName == null || replName.trim().length() == 0) {
    // get default settings (first available)
    // TODO(jl): Fix it in case of returning null
    InterpreterSetting defaultSettings = interpreterSettingManager
    .getDefaultInterpreterSetting(settings);
    return createOrGetInterpreterList(user, noteId, defaultSettings).get(0);
    }

    InterpreterSettingManager.java
    public InterpreterSetting getDefaultInterpreterSetting(List<InterpreterSetting> settings) {
    if (settings == null || settings.isEmpty()) {
    return null;
    }
    return settings.get(0);
    }

    那么这些的解释器又是何时形成的呢?

    在InterpreterFactory.java 有这样一个属性InterpreterSettingManager interpreterSettingManager,在创建这个factory的时候他就被初始化了

    在InterpreterSettingManager.java 中有这样的属性:
    Map < String, List> interpreterBindings:key为noteID,value为interpreterID的集合。
    Map < String, InterpreterSetting> interpreterSettings:key为interpreterID,value为InterpreterSetting

    同时在InterpreterSettingManager构造函数中调用了init这个方法,在init中调用了loadFromFile这个方法,在这个方法中去读取了文件,应该在这个文件中会加载所有的解释器。但是我现在debug不到InterpreterFactory和InterpreterSettingManager 的构造方法,也进不到loadFromFile这个方法内,所以对于实际加载哪一个文件还不是很清楚。