一、概述
目前计算框架和作业类型繁多:包括MapReduce Java、Streaming、HQL、Pig等,如何对这些框架和作业进行统一管理和调度是我们需要面临的一个问题。目前有多种解决方案:可以使用Crontab,也可以自己设计调度系统,还可以直接使用开源系统。
如果我们自己设计调度系统,可以通过crontab+shell来实现,如下所示:
//mapreduce_job.sh
cmd=“hadoop jar example1.jar xxx –D …..”
$cmd
code=$?
while [ $code != 0 ]; do
echo “run job failed, run submit……"
$cmd
done
//hive_job.sh
cmd=“hive -e \“use mydatabase; ALTER TABLE t_aa ADD IF NOT EXISTS PARTITION (pt='2013052515') location '/group/data/aa/2013-05-25-15/';SELECT COUNT(1) FROM t_aa WHERE t_aa.pt = '2013052515‘\” ”
$cmd
code=$?
while [ $code != 0 ]; do
echo “run hql failed, run submit……"
$cmd
done
//main.sh
sh mapreduce_job.sh
sh hive_job.sh
//crontab
*/10 * * * * root /opt/bin/main.sh
可以看到自己设计调度系统有几个缺点:
- 编写复杂,不灵活
- 不易于管理
- 难以与监控、报警相结合
此时我们就需要使用开源作业流调度系统,常见的如下:
- Oozie:Yahoo!开源,基于xml表达作业依赖关系;
- Azkaban:Linkedin开源,通过Java property配置作业依赖关系
- Zeus(宙斯):阿里开源,通过界面配置作业依赖关系
- 其他开源系统:Cascading(通过Java API编程实现作业依赖关系)
下面我们来详细阐述一下哎Oozie的基本使用。
二、Oozie架构和原理
1、什么是Oozie?
Apache Oozie 是用于 Hadoop 平台的一种工作流调度引擎。该框架使用 Oozie 协调器促进了相互依赖的重复工作之间的协调,我们可以使用预定的时间或数据可用性来触发 Apache Oozie。也可以使用 Oozie bundle 系统提交或维护一组协调应用程序。它在Hadoop生态系统中的位置如下:
例如,Oozie可以 运行一个 Apache Sqoop 作业,以便在 MySQL 数据库中的数据上执行导入操作,并将数据传输到 Hadoop 分布式文件系统 (HDFS) 中。可以利用导入的数据集执行 Sqoop 合并操作,从而更新较旧的数据集。通过利用 UNIX shell 操作,可从 MySQL 数据库中提取用来执行 Sqoop 作业的元数据。同理,可执行 Java 操作来更新 Sqoop 作业所需的 MySQL 数据库中的元数据。
2、Oozie的基本原理
本质上来说,Oozie是一种Java Web应用程序,它运行在Java servlet容器(例如Tomca中),并使用数据来存储相关数据,其基本运行流程如下:
- 工作流定义
当前运行的工作流实例,包括实例的状态和变量。Oozie工作流是放置在控制依赖DAG(有向无环图 Direct Acyclic Graph)中的一组动作(例如,Hadoop的Map/Reduce作业、Pig作业等),其中指定了动作执行的顺序。我们会使用hPDL(一种XML流程定义语言)来描述这个图。 - hPDL
hPDL是一种很简洁的语言,只会使用少数流程控制和动作节点。控制节点会定义执行的流程,并包含工作流的起点和终点(start、end和fail节点)以及控制工作流执行路径的机制(decision、fork和join节点)。动作节点是一些机制,通过它们工作流会触发执行计算或者处理任务。Oozie为以下类型的动作提供支持: Hadoop map-reduce、Hadoop文件系统、Pig、Java和Oozie的子工作流(SSH动作已经从Oozie schema 0.2之后的版本中移除了)。 - 计算过程
所有由动作节点触发的计算和处理任务都不在Oozie之中——它们是由Hadoop的Map/Reduce框架执行的。这种方法让Oozie可以支持现存的Hadoop用于负载平衡、灾难恢复的机制。这些任务主要是异步执行的(只有文件系统动作例外,它是同步处理的)。这意味着对于大多数工作流动作触发的计算或处理任务的类型来说,在工作流操作转换到工作流的下一个节点之前都需要等待,直到计算或处理任务结束了之后才能够继续。Oozie可以通过两种不同的方式来检测计算或处理任务是否完成,也就是回调和轮询。当Oozie启动了计算或处理任务的时候,它会为任务提供唯一的回调URL,然后任务会在完成的时候发送通知给特定的URL。在任务无法触发回调URL的情况下(可能是因为任何原因,比方说网络闪断),或者当任务的类型无法在完成时触发回调URL的时候,Oozie有一种机制,可以对计算或处理任务进行轮询,从而保证能够完成任务。 - 参数化定义
Oozie工作流可以参数化(在工作流定义中使用像${inputDir}之类的变量)。在提交工作流操作的时候,我们必须提供参数值。如果经过合适地参数化(比方说,使用不同的输出目录),那么多个同样的工作流操作可以并发。 - 设计触发器
一些工作流是根据需要触发的,但是大多数情况下,我们有必要基于一定的时间段和(或)数据可用性和(或)外部事件来运行它们。Oozie协调系统(Coordinator system)让用户可以基于这些参数来定义工作流执行计划。Oozie协调程序让我们可以以谓词的方式对工作流执行触发器进行建模,那可以指向数据、事件和(或)外部事件。工作流作业会在谓词得到满足的时候启动。 - 设计定时器
经常我们还需要连接定时运行、但时间间隔不同的工作流操作。多个随后运行的工作流的输出会成为下一个工作流的输入。把这些工作流连接在一起,会让系统把它作为数据应用的管道来引用。Oozie协调程序支持创建这样的数据应用管道。
3、Oozie 层次结构
Oozie是管理Hadoop作业的工作流调度系统。Oozie定义了控制流节点和动作节点。其基本层次结构如下:
- Workflow:顺序执行流程节点;
- Coordinator:定时触发workflow;
- Bundle Job:绑定多个Coordinator。
4、Oozie安装
Oozie的安装较为复杂,这里不详细展开,具体可参考这位博主的相关博客。点我
三、Oozie使用示例
我们创建一个简单的示例。我们拥有两个Map/Reduce作业——一个会获取最初的数据,另一个会合并指定类型的数据。实际的获取操作需要执行最初的获取操作,然后把两种类型的数据——Lidar和Multicam——合并。为了让这个过程自动化,我们需要创建一个简单的Oozie工作流。
<!--
Copyright (c) 2011 NAVTEQ! Inc. All rights reserved.
NGMB IPS ingestor Oozie Script
-->
<workflow-app xmlns='uri:oozie:workflow:0.1' name='NGMB-IPS-ingestion'>
<start to='ingestor'/>
<action name='ingestor'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>com.navteq.assetmgmt.MapReduce.ips.IPSLoader</main-class>
<java-opts>-Xmx2048m</java-opts>
<arg>${driveID}</arg>
</java>
<ok to="merging"/>
<error to="fail"/>
</action>
<fork name="merging">
<path start="mergeLidar"/>
<path start="mergeSignage"/>
</fork>
<action name='mergeLidar'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>com.navteq.assetmgmt.hdfs.merge.MergerLoader</main-class>
<java-opts>-Xmx2048m</java-opts>
<arg>-drive</arg>
<arg>${driveID}</arg>
<arg>-type</arg>
<arg>Lidar</arg>
<arg>-chunk</arg>
<arg>${lidarChunk}</arg>
</java>
<ok to="completed"/>
<error to="fail"/>
</action>
<action name='mergeSignage'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>com.navteq.assetmgmt.hdfs.merge.MergerLoader</main-class>
<java-opts>-Xmx2048m</java-opts>
<arg>-drive</arg>
<arg>${driveID}</arg>
<arg>-type</arg>
<arg>MultiCam</arg>
<arg>-chunk</arg>
<arg>${signageChunk}</arg>
</java>
<ok to="completed"/>
<error to="fail"/>
</action>
<join name="completed" to="end"/>
<kill name="fail">
<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end'/>
</workflow-app>
这个工作流定义了三个动作:ingestor、mergeLidar和mergeSignage。并把每个动作都实现为Map/Reduce[4]作业。这个工作流从start节点开始,然后把控制权交给Ingestor动作。一旦ingestor步骤完成,就会触发fork控制节点 [4],它会并行地开始执行mergeLidar和mergeSignage[5]。这两个动作完成之后,就会触发join控制节点[6]。join节点成功完成之后,控制权就会传递给end节点,它会结束这个过程。
创建工作流之后,我们需要正确地对其进行部署。典型的Oozie部署是一个HDFS目录,其中包含workflow.xml、config-default.xml和lib子目录,其中包含有工作流操作所要使用的类的jar文件。
config-default.xml文件是可选的,通常其中会包含对于所有工作流实例通用的工作流参数。如下显示的是config-default.xml的简单示例。
<configuration>
<property>
<name>jobTracker</name>
<value>master:2010</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://master:8020</value>
</property>
<property>
<name>queueName</name>
<value>default</value>
</property>
</configuration>
完成了工作流的部署之后,我们可以使用Oozie提供的命令行工具,它可以用于提交、启动和操作工作流。这个工具一般会运行在Hadoop簇集的edge节点上,并需要一个作业属性文件(参见配置工作流属性)。
oozie.wf.application.path=hdfs://sachicn001:8020/user/blublins/workflows/IPSIngestion
jobTracker=sachicn003:2010
nameNode=hdfs://sachicn001:8020
- 命令行方式
有了作业属性,我们就可以使用如下中的命令来运行Oozie工作流。
oozie job –oozie http://sachidn002.hq.navteq.com:11000/oozie/ -D driveID=729-pp00002-2011-02-08-09-59-34 -D lidarChunk=4 -D signageChunk=20 -config job.properties –run
- 编程方式
尽管上面所述的命令行界面能够很好地用于手动调用Oozie,但有时使用编程的方式调用Oozie更具有优势。当Oozie工作流是特定的应用程序或者大型企业过程的一部分,这就会很有用。我们可以使用Oozie Web Services APIs [6]或者Oozie Java client APIs [7]来实现这种编程方式的调用。代码5中展现的就是很简单的Oozie Java客户端的例子,它会触发上面描述的过程。
package com.kang.oozie;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.WorkflowJob.Status;
public class WorkflowClient {
private static String OOZIE_URL = "http://sachidn002.hq.navteq.com:11000/oozie/";
private static String JOB_PATH = "hdfs://sachicn001:8020/user/blublins/workflows/IPSIngestion";
private static String JOB_Tracker = "sachicn003:2010";
private static String NAMENode = "hdfs://sachicn001:8020";
OozieClient wc = null;
public WorkflowClient(String url){
wc = new OozieClient(url);
}
public String startJob(String wfDefinition, List<WorkflowParameter> wfParameters)
throws OozieClientException{
// create a workflow job configuration and set the workflow application path
Properties conf = wc.createConfiguration();
conf.setProperty(OozieClient.APP_PATH, wfDefinition);
// setting workflow parameters
conf.setProperty("jobTracker", JOB_Tracker);
conf.setProperty("nameNode", NAMENode);
if((wfParameters != null) && (wfParameters.size() > 0)){
for(WorkflowParameter parameter : wfParameters)
conf.setProperty(parameter.getName(), parameter.getValue());
}
// submit and start the workflow job
return wc.run(conf);
}
public Status getJobStatus(String jobID) throws OozieClientException{
WorkflowJob job = wc.getJobInfo(jobID);
return job.getStatus();
}
public static void main(String[] args) throws OozieClientException, InterruptedException{
// Create client
WorkflowClient client = new WorkflowClient(OOZIE_URL);
// Create parameters
List<WorkflowParameter> wfParameters = new LinkedList<WorkflowParameter>();
WorkflowParameter drive = new WorkflowParameter("driveID","729-pp00004-2010-09-01-09-46");
WorkflowParameter lidar = new WorkflowParameter("lidarChunk","4");
WorkflowParameter signage = new WorkflowParameter("signageChunk","4");
wfParameters.add(drive);
wfParameters.add(lidar);
wfParameters.add(signage);
// Start Oozing
String jobId = client.startJob(JOB_PATH, wfParameters);
Status status = client.getJobStatus(jobId);
if(status == Status.RUNNING)
System.out.println("Workflow job running");
else
System.out.println("Problem starting Workflow job");
}
}
在此,我们首先使用Oozie服务器URL对工作流客户端进行初始化。初始化过程完成之后,我们就可以使用客户端提交并启动作业(startJob方法),获得正在运行的作业的状态(getStatus方法),以及进行其他操作。
- 构建java动作,向工作流传递参数
在之前的示例中,我们已经展示了如何使用标签向Java节点传递参数。由于Java节点是向Oozie引入自定义计算的主要方法,因此能够从Java节点向Oozie传递数据也同样重要。
我们可以使用“capture-output””元素把Java节点生成的值传递回给Oozie上下文。然后,工作流的其它步骤可以通过EL-functions访问这些值。返回值需要以Java属性格式文件写出来。我们可以通过“JavaMainMapper.OOZIE_JAVA_MAIN_CAPTURE_OUTPUT_FILE”常量从System属性中获得这些属性文件的名称。如下演示了如何完成这项操作。
package com.kang.oozie;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.Properties;
public class GenerateLookupDirs {
/**
* @param args
*/
public static final long dayMillis = 1000 * 60 * 60 * 24;
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
public static void main(String[] args) throws Exception {
Calendar curDate = new GregorianCalendar();
int year, month, date;
String propKey, propVal;
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
if (oozieProp != null) {
File propFile = new File(oozieProp);
Properties props = new Properties();
for (int i = 0; I < 8; ++i) {
year = curDate.get(Calendar.YEAR);
month = curDate.get(Calendar.MONTH) + 1;
date = curDate.get(Calendar.DATE);
propKey = "dir"+i;
propVal = year + "-" +
(month < 10 ? "0" + month : month) + "-" +
(date < 10 ? "0" + date : date);
props.setProperty(propKey, propVal);
curDate.setTimeInMillis(curDate.getTimeInMillis() - dayMillis);
}
OutputStream os = new FileOutputStream(propFile);
props.store(os, "");
os.close();
} else
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
+ " System property not defined");
}
}