zeppelin的module、package、class众多,如何快速地理清头绪,抓住重点?本文分析zeppelin主要module中重点的类以及它们之间的关系,理清这些类的职责,对于理解zeppelin的运行过程至关重要。
经过之前文章的分析,我们已经了解了zeppelin涉及到框架层面的几个module为:zeppelin-server、zeppelin-zengine、zeppelin-interpreter,并且三者之间有如下的依赖关系:
本文要分析的主要的class,也都来自于这三个module。
以上类图中省略了字段和方法,以避免过早引入太多细节,重点关注类与类之间的关系组成。由于篇幅的限制,再加上zeppelin提供的核心价值是与Interpreter相关的多语言repl解释器,笔者就选择从右上角黄色的区域开始,分多篇分析。
Interpreter
Interpreter是一个抽象类,该类是zeppelin核心类,Zeppelin提供的核心价值:解释执行各种语言的代码,都是通过该抽象类的每个具体的实现类完成的。Interpreter主要规定了各语言repl解释器需要遵循的“规范(contract)”,包括:
1. repl解释器的生命周期管理。如open(), close(), destroy(),规定了产生和销毁repl解释器。
2. 解释执行代码的接口——interpreter(),这些真正产生价值的地方。
3. 执行代码过程中交互控制和易用性增强,如cancel(), getProgress(), completion(),分别是终止代码的执行、获取执行进度以及代码自动完成。
4. 解释器的配置接口,如setProperty()、setClassLoaderURL(URL[])等。
5. 性能优化接口,如getScheduler(),getIntepreterGroup()等。
6. 解释器注册接口(已经deprecated了),如一系列重载的register接口。
以上体现了zeppelin的repl解释器进程需要受其主进程ZeppelinServer的控制,也是zeppelin设计决策在代码中的体现。
注:现在的解释器注册通过如下2种方式进行:
- 将interpreter-setting.json打包到解释器的jar文件中
- 放置到如下位置:interpreter/{interpreter}/interpreter-setting.json
RemoteInterpreterService Thrift协议分析
Apache Thrift是跨语言RPC通信框架,提供了相应的DSL(Domain Specific Language)和支持多种语言的代码生成工具,使得代码开发人员可以只关注具体的业务,而不用关注底层的通信细节。zeppelin使用Thrift定义了其主进程ZeppelinServer与需要采用独立JVM进程运行的各repl解释器之间的通信协议。
关于为什么要采用单独的JVM进程来启动repl解释器进程,本系列的第3篇也有提及,这里再赘述一下:
1. zeppelin旨在提供一个开放的框架,支持多种语言和产品,由于每种语言和产品都是各自独立演进的,各自的运行时依赖也各不相同,甚至是相互冲突的,如果放在同一JVM中,仅解决冲突,维护各个产品之间的兼容性都是一项艰巨的任务,某些产品版本甚至是完全不能兼容的。
2. 大数据分析,是否具有横向扩展能力是production-ready一项重要的衡量指标,如果将repl进程与主进程合在一起,会验证影响系统性能。
因此,在有必要的时候,zeppelin采用独立JVM的方式来启动repl进程,并且采用Thrift协议定义了主进程与RemoteInterpreterService进程之间的通信协议,具体如下:
service RemoteInterpreterService {
void createInterpreter(1: string intpGroupId, 2: string noteId, 3: string className, 4: map<string, string> properties);
void open(1: string noteId, 2: string className);
void close(1: string noteId, 2: string className);
RemoteInterpreterResult interpret(1: string noteId, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
void cancel(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
i32 getProgress(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
string getFormType(1: string noteId, 2: string className);
list<InterpreterCompletion> completion(1: string noteId, 2: string className, 3: string buf, 4: i32 cursor);
void shutdown();
string getStatus(1: string noteId, 2:string jobId);
RemoteInterpreterEvent getEvent();
// as a response, ZeppelinServer send list of resources to Interpreter process
void resourcePoolResponseGetAll(1: list<string> resources);
// as a response, ZeppelinServer send serialized value of resource
void resourceResponseGet(1: string resourceId, 2: binary object);
// get all resources in the interpreter process
list<string> resourcePoolGetAll();
// get value of resource
binary resourceGet(1: string noteId, 2: string paragraphId, 3: string resourceName);
// remove resource
bool resourceRemove(1: string noteId, 2: string paragraphId, 3:string resourceName);
void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string
object);
void angularObjectAdd(1: string name, 2: string noteId, 3: string paragraphId, 4: string object);
void angularObjectRemove(1: string name, 2: string noteId, 3: string paragraphId);
void angularRegistryPush(1: string registry);
}
与前面的Interpreter类的定义进行对比不难发现,RemoteInterpreterService Thrift接口与Interpreter抽象类定义的接口大部分相同,不同之处在于:
1. RemoteInterpreterService接口的实现类由于运行在不同的JVM中,需要在每个接口方法中额外传递环境信息,如noteId和className等,如createInterpreter、open、close、cancel等。
2. RemoteInterpreterService接口中多出了两种类型的接口,一种是为了完成ZeppelinServer进程和RemoteInterpreter进程之间的resource协商(neigotiation),如resourceXXX接口;另一种是为了完成2者之间angular object的前后台双向绑定,如augularXXX接口。
具体文件位置见:
${ZEPPELIN_HOME}/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift。在其同级目录下,zeppelin还提供了代码生成脚本genthrift.sh:
thrift --gen java RemoteInterpreterService.thrift
mv gen-java/org/apache/zeppelin/interpreter/thrift ../java/org/apache/zeppelin/interpreter/thrift
rm -rf gen-java
可以看出,
${ZEPPELIN_HOME}/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift目录下所有文件都是Thrift的代码生成器根据该接口文件自动生成的。如果我们修改过该接口文件,则需要重新执行该脚本。
InterpreterGroup
InterpterGroup继承了ConcurrentHashMap
RemoteInterpreterProcess
RemoteInterpreterProcess是采用独立JVM启动repl进程的具体执行类,它采用Apache Commons Exec框架来根据Zeppelin主进程的”指示”启动独立进程,具体逻辑如下:
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
cmdLine.addArgument("-d", false);
cmdLine.addArgument(interpreterDir, false);
cmdLine.addArgument("-p", false);
cmdLine.addArgument(Integer.toString(port), false);
cmdLine.addArgument("-l", false);
cmdLine.addArgument(localRepoDir, false);
executor = new DefaultExecutor();
watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchdog);
running = true;
try {
Map procEnv = EnvironmentUtils.getProcEnvironment();
procEnv.putAll(env);
logger.info("Run interpreter process {}", cmdLine);
executor.execute(cmdLine, procEnv, this);
} catch (IOException e) {
//省略...
}
这里有几点主要注意:
1. 该进程端口是zeppelin自动寻找操作系统中当前可用的端口
2. RemoteInterpreterProcess并非在在构造函数中,就启动JVM,而是在被引用(reference方法被调用)之后,才启动的
3. 具体的interpterRunner脚本为${ZEPPELIN_HOME}/bin/interpreter.sh,参见
ZeppelinConfiguration.getInterpreterRemoteRunnerPath()
interpreter.sh文件重点部分如下:
##省略环境变量和classpath拼接等内容
ZEPPELIN_SERVER=org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer
SPARK_APP_JAR="$(ls ${ZEPPELIN_HOME}/interpreter/spark/zeppelin-spark*.jar)"
if [[ -n "${SPARK_SUBMIT}" ]]; then
${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${CLASSPATH}" --driver-java-options "${JAVA_INTP_OPTS}" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${PORT} &
else
${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} &
fi
可以看出,在单机环境下,主要是在启动单独JVM进程,执行RemoteInterpreterServer,并向其main方法,传递必要的参数。在spark环境下,我们会单独分析,此处暂时略过。
RemoteInterpreter
RemoteInterpreter可能是zeppelin中最误导人的类命名了,笔者认为其命名为RemoteIntepreterProxy,或者是InterpterProxy、InterpterStub更合适一些,因为其本质是远程Interpter的本地代理,是Proxy模式的典型应用,其运行在zeppelin主进程中,通过Thrift服务的Client来控制远程Interpreter的执行。
通过其初始化代码,可见一斑:
//省略掉了出错处理等其他内容
public synchronized void init() {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
final InterpreterGroup interpreterGroup = getInterpreterGroup();
interpreterProcess.reference(interpreterGroup);
synchronized (interpreterProcess) {
Client client = interpreterProcess.getClient();
client.createInterpreter(groupId, noteId, getClassName(), (Map) property);
}
}