接上篇,本文接着分析zeppelin类图中右上角剩余的类,同样,在分析的过程中,我们重点关注该class的职责划分,以及与其他类配合,完成zeppelin的设计目标的过程。
InterpreterInfoSaving
InterpreterInfoSaving是一个convenient类(提供的功能可以由其他类组合完成,设计目的是为了方便调用),定义其目的就是为了将原来分散存储在各个interpreter子文件夹中的InterpreterSetting、intepreterBindings和interpreterRepositories集中到一处,方便进行查找和持久化。
原来Interpter的注册方式是static注册方式:即每个Interpreter的实现类都定义一段static初始化段,在该类被第一次加载的时候,将其注册到zeppelin中。例如:下面是Python interpreter的注册代码。
static {
Interpreter.register(
"python",
"python",
PythonInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(ZEPPELIN_PYTHON, DEFAULT_ZEPPELIN_PYTHON,
"Python directory. Default : python (assume python is in your $PATH)")
.build()
);
}
现在这种方式已经Deprecated了,新的Interpreter采用了json配置文件的方式来保存这些配置,并且由zeppelin加载时来解析这些配置,该文件约定为interpreter-setting.json,该文件可以放到如下2种位置:
1. 作为main resources打包到interpreter的jar包中
2. 将其放到${ZEPPELIN_HOME}/interpreter/{interpreter}/interpreter-setting.json,位置下
interpreter-settings.json就近与interpreter的jar包存储在一起,符合封装的原则,但是zeppelin需要知道全局已经注册了哪些interpreter,以及这些interpreter的配置
InterpreterInfoSaving类的唯一实例会被持久化到${ZEPPELIN_HOME}/conf/interpreter.json文件,如下图:
可以看出,InterpreterInfoSaving类是${ZEPPELIN_HOME}/conf/interpreter.json持久化文件相对应的内存对象。
InterpreterSetting
InterpreterSettings是一份Interpreter创建的”contract”,其职责如下:
1. 维护了interpreter相关的元信息,这些信息对于正确运行intepreter进程至关重要,如:与外界环境进行交互(如spark on Yarn集群)、第三方依赖加载等
2. 维护了note和intepreterGroup之间的关系
上篇中我们提到InterpreterGroup是创建jvm、note bind的最小单位,因此,也就决定了在多个note时,创建jvm的个数以及在多个note之间进行变量共享的程度。每个interpreter进程可以有一个或者多个interpreterGroup,每个interpreter实例都从属于其中某一个InterpreterGroup。三者之间的关系如下图:
如下是spark interpreter的settings:
"2C6QR3FVF": {
"id": "2C6QR3FVF",
"name": "spark",
"group": "spark",
"properties": {
"spark.executor.memory": "",
"args": "",
"zeppelin.spark.printREPLOutput": "true",
"spark.cores.max": "",
"zeppelin.dep.additionalRemoteRepository": "spark-packages,http://dl.bintray.com/spark-packages/maven,false;",
"zeppelin.spark.importImplicit": "true",
"zeppelin.spark.sql.stacktrace": "false",
"zeppelin.spark.concurrentSQL": "false",
"zeppelin.spark.useHiveContext": "true",
"zeppelin.pyspark.python": "python",
"zeppelin.dep.localrepo": "local-repo",
"zeppelin.interpreter.localRepo": "..//local-repo/2C6QR3FVF",
"zeppelin.R.knitr": "true",
"zeppelin.spark.maxResult": "1000",
"master": "local[*]",
"spark.app.name": "Zeppelin",
"zeppelin.R.image.width": "100%",
"zeppelin.R.render.options": "out.format \u003d \u0027html\u0027, comment \u003d NA, echo \u003d FALSE, results \u003d \u0027asis\u0027, message \u003d F, warning \u003d F",
"zeppelin.R.cmd": "R"
},
"interpreterGroup": [
{
"class": "org.apache.zeppelin.spark.SparkInterpreter",
"name": "spark"
},
{
"class": "org.apache.zeppelin.spark.SparkSqlInterpreter",
"name": "sql"
},
{
"class": "org.apache.zeppelin.spark.DepInterpreter",
"name": "dep"
},
{
"class": "org.apache.zeppelin.spark.PySparkInterpreter",
"name": "pyspark"
},
{
"class": "org.apache.zeppelin.spark.SparkRInterpreter",
"name": "r"
}
],
"dependencies": [],
"option": {
"remote": true,
"perNoteSession": false,
"perNoteProcess": false,
"isExistingProcess": false,
"port": "-1"
}
}
InterpreterSettings实例被InterpreterFactory对象初始化并填充。
多个note时,究竟是为每个note创建不同的interpreter进程还是共享同一个interpreter进程,zeppelin UI上提供了三种不同的模式可供选择:
Shared模式
一种解释器只有一个Interpreter进程,并且该进程中只有一个InterpreterGroup,所有的Interpreter实例都从属于该InterpreterGroup,当然,也肯定在同一个进程内部。多个note之间,可以很容易的共享变量。
以下关于3种模式的解释来自于Lee Moon Soo相关文章
Scoped模式
一种repl解释器只有一个Interpreter进程,但是与Shared模式不同,会创建过个InterpreterGroup,每个note关联一个InterpreterGroup。这样每个note相当于有了自己的session,session与session相互隔离,但是仍然由于这些InterpreterGroup仍然在同一个进程中,仍然可以在它们之间共享变量。
Isolated模式
独占式,为每个note创建一个独立的intepreter进程,该进程中创建一个InterpreterGroup实例,为该note的服务的Interpreter实例从属于该InterpreterGroup。
SparkInterpreter的Shared/Scoped/Isolated模式
以SparkInterpreter为例,说明这三种模式对SparkContext和Scala repl共享方式上的差别:
SparkInterpreter Shared
所有的note共享同一个SparkContext和Scala REPL实例,因此,如果其中一个note定义了变量a,另外一个note可以访问并且修改该变量a。
SparkInterpreter Scoped
所有的note共享同一个SparkContext,所有的spark job都是通过同一个SparkContext提交的,但是不同的Scala repl解释器,由于不同享Scala repl,故不存在一个note访问并修改了另一个note定义的变量的问题。
SparkInterpreter Isolated
独占式,每个note都有自己的SparkContext和Scala repl,不共享。
其实这三种模式,底层都是通过InterpreterOption类来控制的。
InterpreterOption
虽然InterpreterOption是一个简单的POJO,但是其字段取值,直接决定了zeppelin在创建interpreter进程时的处理方式,直接体现了”性能和资源占用之间进行trade-off”多种策略:
property | 含义 | 说明 |
---|---|---|
existingProcess | 是否连接已有intereter进程 | intereter进程可以独立启动,如在远程(不同与zeppelin运行的其他host)节点上,可以采用该手段将repl进程进行分布式部署,并且先于zeppelin启动,让zeppelin连接到这些已有的线程,以解决intereter进程横向扩展的问题。该模式需要显式指定host和port,表示zeppelin主进程(ZeppelinServer所在进程)与该interpreter进程进行的IPC时,通信的socket。 |
perNoteProcess | 每个Note创建一个IntereterGroup进程 | 这是最细粒度的interpreter进程创建方式,同时也是最粗放的资源使用的方式,如果interpreter都在运行zeppelin的host上启动的话, note数量很多的话,很可能造该host内存耗尽 |
perNoteSession | 每个Note的在同一个IntereterGroup进程中创建不同的Interpreter实例 | 创建interpreter进程时只创建一个,但是在该进程内部,创建不同的interpreter实例。这种方式比perNoteProcess要节省内存资源。 |
InterpreterSettings是InterpreterOption的使用方,重要的地方见其getInterpreterGroup和getInterpreterProcessKey方法:
private String getInterpreterProcessKey(String noteId) {
if (getOption().isExistingProcess) {
return Constants.EXISTING_PROCESS;//existingProcess模式,共享现有的IntereterGroup进程
} else if (getOption().isPerNoteProcess()) {
return noteId;//perNoteProcess模式,每个note都会创建一个新的进程
} else {
return SHARED_PROCESS;//sharedProcess模式,所有note共享同一个interpreter进程
}
}
//为note创建新的InterpreterGroup或者是关联现有的InterpreterGroup
public InterpreterGroup getInterpreterGroup(String noteId) {
String key = getInterpreterProcessKey(noteId);
synchronized (interpreterGroupRef) {
if (!interpreterGroupRef.containsKey(key)) {
String interpreterGroupId = id() + ":" + key;
InterpreterGroup intpGroup =
interpreterGroupFactory.createInterpreterGroup(interpreterGroupId, getOption());
interpreterGroupRef.put(key, intpGroup);
}
return interpreterGroupRef.get(key);
}
}
InterpreterFactory
InterpreterFactory是InterpreterGroupFactory的实现类,承担如下职责:
1. interpreter实例的实际创建者
2. interpreter配置文件的加载与持久化,如loadFromFile()和saveToFile()
3. interpreterSettings的管理
4. 第三方依赖加载器
5. 在zeppelin主进程中建立远程AngularObjectRegistry的本地Proxy——RemoteAngularObjectRegistry,以保证远程interpreter进程与前端angular对象双向绑定。
InterpreterFactory实际执行创建interpreter实例的方式为createInterpretersForNote,具体实现如下:
public void createInterpretersForNote(
InterpreterSetting interpreterSetting,
String noteId,
String key) {
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(noteId);//调用interpreterSetting.getInterpreterGroup以确定创建InterpreterGroup的策略
String groupName = interpreterSetting.getGroup();
InterpreterOption option = interpreterSetting.getOption();
Properties properties = interpreterSetting.getProperties();
if (option.isExistingProcess) {
properties.put(Constants.ZEPPELIN_INTERPRETER_HOST, option.getHost());
properties.put(Constants.ZEPPELIN_INTERPRETER_PORT, option.getPort());
}
//省略了同步等到相同key interpreterGroup终止的代码
logger.info("Create interpreter instance {} for note {}", interpreterSetting.getName(), noteId);
for (String className : interpreterClassList) {
Set<String> keys = Interpreter.registeredInterpreters.keySet();
for (String intName : keys) {
RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName);
if (info.getClassName().equals(className)
&& info.getGroup().equals(groupName)) {
Interpreter intp;
if (option.isRemote()) {//在单独启动的intepreter进程中创建Interpreter实例
intp = createRemoteRepl(info.getPath(),
key,
info.getClassName(),
properties,
interpreterSetting.id());
} else {//在zeppelin主进程中,动态加载并反射创建Interpreter实例
intp = createRepl(info.getPath(),
info.getClassName(),
properties);
}
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(key);
if (interpreters == null) {
interpreters = new LinkedList<Interpreter>();
interpreterGroup.put(key, interpreters);//完成noteId:interpreters实例或者interpreterGroup:interpreters实例之间的映射
}
interpreters.add(intp);
}
logger.info("Interpreter " + intp.getClassName() + " " + intp.hashCode() + " created");
intp.setInterpreterGroup(interpreterGroup);
break;
}
}
}
}
以下是createRemoteRepl方法实现,重点是用LazyOpenInterpreter Proxy了一个RemoteInterpreter实例:
private Interpreter createRemoteRepl(String interpreterPath, String noteId, String className,
Properties property, String interpreterSettingId) {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId;
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
interpreterPath, localRepoPath, connectTimeout,
maxPoolSize, remoteInterpreterProcessListener));
return intp;
}
InterpreterFactory执行解除note和interpreter实例之间的关系方法见removeInterpretersForNote:
public void removeInterpretersForNote(InterpreterSetting interpreterSetting,
String noteId) {
if (interpreterSetting.getOption().isPerNoteProcess()) {//perNoteProcess,直接关闭process
interpreterSetting.closeAndRemoveInterpreterGroup(noteId);
} else if (interpreterSetting.getOption().isPerNoteSession()) {//perNoteSession,由于process是共享的,note关闭,只关闭该note相关的interpreter实例,process不关闭
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(noteId);
interpreterGroup.close(noteId);
interpreterGroup.destroy(noteId);
synchronized (interpreterGroup) {
interpreterGroup.remove(noteId);
interpreterGroup.notifyAll(); // notify createInterpreterForNote()
}
logger.info("Interpreter instance {} for note {} is removed",
interpreterSetting.getName(),
noteId);
}
}
参考文献
[1.] Apache Zeppelin, Interpreter mode explained, Lee Moon Soo