大数据之Oozie——源码分析(一)程序入口

时间:2023-03-08 17:05:41

工作中发现在oozie中使用sqoop与在shell中直接调度sqoop性能上有很大的差异。为了更深入的探索其中的缘由,开始了oozie的源码分析之路。今天第一天阅读源码,由于没有编译成功,不能运行测试用例,直接使用sublime肉眼阅读,还是挺费劲的。

虽然流程还不是顺畅,但是大体上的内容还算是了解了。

我这里使用的是oozie4.2的版本,之前稍微看过4.3版本的,源码上还是有一定的差异的。

大数据之Oozie——源码分析(一)程序入口

看上面的图,大致理解oozie的过程是:

  • oozie cli提交任务
  • oozie server创建一个对应任务的client
  • client去提交相应的任务

oozie工程结构

最重要的就是三个:

  • 1 client 这是任务提交的入口
  • 2 core 这是oozie的核心(在3中好像拆分成了core和server)
  • 3 distro 这里保存了启动脚本

寻找源码入口

  • 一种方式是直接以文件夹搜索main方法。
  • 另一种是看它的启动脚本。

在启动脚本中oozie.cmd,有这样一句:

%JAVA_BIN% %JAVA_PROPERTIES% -cp %OOZIECPPATH% org.apache.oozie.cli.OozieCLI %OOZIE_PROPERTIES%

可见,入口在org.apache.oozie.cli.OozieCLI这个类中,那就从它开始吧。

sqoop作业的提交

首先是OozieCLI的入口main方法:

public static void main(String[] args) {
//oozie方法的入口
if (!System.getProperties().containsKey(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP)) {
System.setProperty(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP, "true");
}
System.exit(new OozieCLI().run(args));
}

前面是一些认证的东西,可以忽略,直接进入run方法:

public synchronized int run(String[] args) {
//保证clent仅启动一次
if (used) {
throw new IllegalStateException("CLI instance already used");
}
used = true;
//创建参数解析器
final CLIParser parser = getCLIParser();
try {
final CLIParser.Command command = parser.parse(args); String doAsUser = command.getCommandLine().getOptionValue(DO_AS_OPTION); if (doAsUser != null) {
OozieClient.doAs(doAsUser, new Callable<Void>() {
@Override
public Void call() throws Exception {
processCommand(parser, command);
return null;
}
});
}
else {
processCommand(parser, command);
}
return 0;
}
...
}

主要的内容是在这个processCommand里面,processCommand会根据命令调用相应的命令方法:

public void processCommand(CLIParser parser, CLIParser.Command command) throws Exception {
if (command.getName().equals(HELP_CMD)) {
parser.showHelp(command.getCommandLine());
}
else if (command.getName().equals(JOB_CMD)) {
jobCommand(command.getCommandLine());
}
else if (command.getName().equals(JOBS_CMD)) {
jobsCommand(command.getCommandLine());
}
else if (command.getName().equals(ADMIN_CMD)) {
adminCommand(command.getCommandLine());
}
else if (command.getName().equals(VERSION_CMD)) {
versionCommand();
}
else if (command.getName().equals(VALIDATE_CMD)) {
validateCommand(command.getCommandLine());
}
else if (command.getName().equals(SLA_CMD)) {
slaCommand(command.getCommandLine());
}
else if (command.getName().equals(PIG_CMD)) {
scriptLanguageCommand(command.getCommandLine(), PIG_CMD);
}
else if (command.getName().equals(HIVE_CMD)) {
scriptLanguageCommand(command.getCommandLine(), HIVE_CMD);
}
else if (command.getName().equals(SQOOP_CMD)) {
sqoopCommand(command.getCommandLine());//我关注的sqoop在这里
}
else if (command.getName().equals(INFO_CMD)) {
infoCommand(command.getCommandLine());
}
else if (command.getName().equals(MR_CMD)){
mrCommand(command.getCommandLine());
}
}

在sqoopCommand方法里面,sqoop任务被提交:

private void sqoopCommand(CommandLine commandLine) throws IOException, OozieCLIException {
List<String> args = commandLine.getArgList();
if (args.size() > 0) {
// checking if args starts with -X (because CLIParser cannot check this)
if (!args.get(0).equals("-X")) {
throw new OozieCLIException("Unrecognized option: " + args.get(0) + " Expecting -X");
}
args.remove(0);
} if (!commandLine.hasOption(SQOOP_COMMAND_OPTION)) {
throw new OozieCLIException("Need to specify -command");
} if (!commandLine.hasOption(CONFIG_OPTION)) {
throw new OozieCLIException("Need to specify -config <configfile>");
} try {
XOozieClient wc = createXOozieClient(commandLine);
Properties conf = getConfiguration(wc, commandLine);
String[] command = commandLine.getOptionValues(SQOOP_COMMAND_OPTION);
System.out.println(JOB_ID_PREFIX + wc.submitSqoop(conf, command, args.toArray(new String[args.size()])));
}
catch (OozieClientException ex) {
throw new OozieCLIException(ex.toString(), ex);
}
}

最重要的内容就在这几行:

XOozieClient wc = createXOozieClient(commandLine);
Properties conf = getConfiguration(wc, commandLine);
String[] command = commandLine.getOptionValues(SQOOP_COMMAND_OPTION);
System.out.println(JOB_ID_PREFIX + wc.submitSqoop(conf, command, args.toArray(new String[args.size()])));

其中wc.submitSqoop提交了sqoop的任务。

后续问题

  • 1 任务提交到了哪里?
  • 2 在提交任务的时候都做了很么?
  • 3 如何在mapreduce开启一个新的sqoop的?
  • 4 为什么在yarn中可以同时看到两个应用,一个oozie,一个是sqoop

参考

1 oozie(4.1.0)架构及二次开发流程