Oozie Spark Action 配置
- Spark Action 用来运行spark 任务,流程任务必须等当前节点的spark任务执行完成之后才能执行后续节点任务。
- 运行Spark Job,必须在 spark action里面配置 job-tracer,name-node,master,和一些必要的参数和配置。
- Spark options 可以用 " spark-opts" 元素来配置
- 同Shell Action一样 Spark Action 可以配置成创建或者删除HDFS目录之后再去执行一个Sqoop任务
- Spark 应用的配置可以使用job-xml文件中的元素,也可以使用内部元素来配置,像EL表达式也支持在内部元素中的配置,内部元素的配置可以覆盖外部文件中的配置。
Spark Action格式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
<
workflow-app
name
=
"[WF-DEF-NAME]"
xmlns
=
"uri:oozie:workflow:0.3"
>
...
<
action
name
=
"[NODE-NAME]"
>
<
spark
xmlns
=
"uri:oozie:spark-action:0.1"
>
<
job-tracker
>[JOB-TRACKER]</
job-tracker
>
<
name-node
>[NAME-NODE]</
name-node
>
<
prepare
>
<
delete
path
=
"[PATH]"
/>
...
<
mkdir
path
=
"[PATH]"
/>
...
</
prepare
>
<
job-xml
>[SPARK SETTINGS FILE]</
job-xml
>
<
configuration
>
<
property
>
<
name
>[PROPERTY-NAME]</
name
>
<
value
>[PROPERTY-VALUE]</
value
>
</
property
>
...
</
configuration
>
<
master
>[SPARK MASTER URL]</
master
>
<
mode
>[SPARK MODE]</
mode
>
<
name
>[SPARK JOB NAME]</
name
>
<
class
>[SPARK MAIN CLASS]</
class
>
<
jar
>[SPARK DEPENDENCIES JAR / PYTHON FILE]</
jar
>
<
spark-opts
>[SPARK-OPTIONS]</
spark-opts
>
<
arg
>[ARG-VALUE]</
arg
>
...
<
arg
>[ARG-VALUE]</
arg
>
...
</
spark
>
<
ok
to
=
"[NODE-NAME]"
/>
<
error
to
=
"[NODE-NAME]"
/>
</
action
>
...
</
workflow-app
>
|
-
prepare 元素 如果存在,表明在执行sqoop 命令之前需要执行的一系列 hdfs路径的创建和删除操作,并且路径必须以 hdfs://HOST:PORT 开头
-
job-xml 元素 如果存在,则作为sqoop任务的配置文件,从 schema 0.3开始支持多个job-xml元素用来支持多个job.xml文件
-
configuration 用来给spark任务传递参数
-
master用来指定spark master 例如: spark://host:port, mesos://host:port, yarn-cluster, yarn-master, or local.
-
mode以集群或者客户端的模式运行spark 程序 例如 :client,cluster
-
name spark 应用的名称
-
classspark运行程序的主类名
-
jarspark 应用需要引用的其它jar包
-
spark-opts提交给驱动程序的参数。比如--conf key=value或者是在oozie-site.xml中配置的oozie.service.SparkConfiguationService.spark.configurations
-
arg spark 应用参数
Spark Action 使用实例一:Oozie自带案例运行,使用oozie调度spark程序
1,首先下载Oozie自带的例子,解压,打开到 examples\apps\spark 目录,根据自己的安装环境修改之后的job.properties文件如下
1
2
3
4
5
6
7
|
jobTracker=hadoop-node1.novalocal:8021
master=local[*]
queueName=default
examplesRoot=xwj_test
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/oozie/${examplesRoot}/apps/spark
|
2,根据自己测试环境路径,修改后的workflow.xml 内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
<
workflow-app
xmlns
=
'uri:oozie:workflow:0.5'
name
=
'SparkFileCopy'
>
<
start
to
=
'spark-node'
/>
<
action
name
=
'spark-node'
>
<
spark
xmlns
=
"uri:oozie:spark-action:0.1"
>
<
job-tracker
>${jobTracker}</
job-tracker
>
<
name-node
>${nameNode}</
name-node
>
<
prepare
>
<
delete
path
=
"${nameNode}/user/oozie/${examplesRoot}/apps/spark/output"
/>
</
prepare
>
<
master
>${master}</
master
>
<
name
>Spark-FileCopy</
name
>
<
class
>org.apache.oozie.example.SparkFileCopy</
class
>
<
jar
>${nameNode}/user/oozie/${examplesRoot}/apps/spark/lib/oozie-examples.jar</
jar
>
<
arg
>${nameNode}/user/oozie/${examplesRoot}/apps/spark/input/data.txt</
arg
>
<
arg
>${nameNode}/user/oozie/${examplesRoot}/apps/spark/output</
arg
>
</
spark
>
<
ok
to
=
"end"
/>
<
error
to
=
"fail"
/>
</
action
>
<
kill
name
=
"fail"
>
<
message
>Workflow failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]
</
message
>
</
kill
>
<
end
name
=
'end'
/>
</
workflow-app
>
|
3,查看
org.apache.oozie.example.SparkFileCopy 内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
package
org.apache.oozie.example;
import
java.io.PrintStream;
import
org.apache.spark.SparkConf;
import
org.apache.spark.api.java.JavaRDD;
import
org.apache.spark.api.java.JavaSparkContext;
public
final
class
SparkFileCopy
{
public
static
void
main(String[] args)
throws
Exception
{
if
(args.length <
2
) {
System.err.println(
"Usage: SparkFileCopy <file> <file>"
);
System.exit(
1
);
}
SparkConf sparkConf =
new
SparkConf().setAppName(
"SparkFileCopy"
);
JavaSparkContext ctx =
new
JavaSparkContext(sparkConf);
JavaRDD lines = ctx.textFile(args[
0
]);
lines.saveAsTextFile(args[
1
]);
System.out.println(
"Copied file from "
+ args[
0
] +
" to "
+ args[
1
]);
ctx.stop();
}
}
|
其功能就是一个文件的拷贝
4,首先在本地的测试节点上创建文件夹
mkdir -p /opt/mydata/user/oozie/xwj_test/apps/shell/spark
5,在hdfs上创建目录 hdfs dfs -mkdir -p /user/oozie/xwj_test/apps/shell/spark
6,将上述文件上传到新建好的目录中
cd /opt/mydata/user/oozie/xwj_test/apps/shell/spark
6,将本地文件 上传到hdfs目录中
hdfs dfs -put ../spark/* /user/oozie/xwj_test/apps/shell/spark
7,查看hdfs上的目录文件是否存在
hdfs dfs -ls -r /user/oozie/xwj_test/apps/shell/spark
8,切换yarn用户重新提交任务
su yarn
oozie job -oozie http://hadoop-node0.novalocal:11000/oozie -config /opt/mydata/user/oozie/xwj_test/apps/shell/
email/job.properties -run
查看结果