1. 环境准备
Hadoop 2.7.1
Spark1.6.0
Scala2.10.5
JDK1.7
Idea 15.0.2
2. 创建第一个SBT项目
2.1. File->Project
(1)选择Scala->SBT->Next
(2)在Project name中输入项目名称,例如:sparkdemo,然后点击“Finished”
2.2. SBT项目工程结构
2.3. SBT项目配置依赖
通过SBT生成的项目会生成一个build.sbt文件(类似于maven的pom.xml),在这里添加用户需要的依赖包。如:
3. Spark编写WordCount程序
3.1. 引入依赖包
在项目sparkdemo根目录下文件build.sbt增加下面内容:
name := "sparkdemo"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" %"1.6.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.0"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
3.2. 编写WordCount程序
选中工程->右键->New->Scala Class->选择“object”,然后就创建scala扩展名称的文件,然后编写自己的spark程序即可。
package com.ykinfo.demo
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by fuli.shen on 2016/5/25.
*/
object WordCount {
def main(args: Array[String]) {
if(args.length != 2){
println("Usage:<sourcePath> <inputPath> <outputPath>")
sys.exit
}
// 0-初始化
val config:SparkConf = new SparkConf()
.setAppName("WordCount")
val sc: SparkContext = new SparkContext(config)
// 1-读取hdfs数据
val textFile: RDD[String] = sc.textFile(args(0))
// 2-转换成所有单词数组
val flatMap: RDD[String] = textFile.flatMap(x=>x.split("\t"))
// 3-每个单词转为(word,1),并按照该key进行进行求和
val shuffleRDD: RDD[(String, Int)] = flatMap.map(word=>(word,1)).reduceByKey((a, b)=>a+b)
// 4-按照每个单词出现的次数降序排序
val mapRDD: RDD[(String, Int)] = shuffleRDD.map(tuple=>(tuple._2,tuple._1)).sortByKey(false).map(tuple=>(tuple._2,tuple._1))
// 5-计算结果保存
mapRDD.saveAsTextFile(args(1))
}
}
3.3. 打包部署
本项目采用SBT工具方式,可以采用SBT命令相关工程的打包部署,例如:我在window环境下操作,具体命令:
D:\work\sparkdemo>sbt clean compile package
最后会生成一个打包后的文件,例如:
[info] Packaging D:\work\sparkdemo\target\scala-2.10\sparkdemo_2.10-1.0.jar
那么,我们就可以直接sparkdemo_2.10-1.0.jar 上传到Spark集群上,然后运行了。
3.4. Spark 程序运行
3.4.1. Run application locally on 2 cores
./bin/spark-submit \
--class com.ykinfo.demo.WordCount \
--master local[2] \
/home/hadoop/work/sparkdemo_2.10-1.0.jar \
data.txt \
output1
注:local[2] 表示通过2个线程执行任务
3.4.2. Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class com.ykinfo.demo.WordCount \
--master spark://10.1.1.8:7077 \
--executor-memory 2G \
--total-executor-cores 10 \
/home/hadoop/work/sparkdemo_2.10-1.0.jar \
data.txt output2
运行结果:
[hadoop@10-1-1-8 spark-1.6.0]$ hdfs dfs -cat output2/*
(hello,2)
(you,1)
(wo,1)
(helo,1)
(me,1)
执行完上述命令后,我们查看一下spark集群的状况,如图所示:
3.4.3. Run on a YARN cluster
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
./bin/spark-submit \
--class com.ykinfo.demo.WordCount \
--master yarn \
--deploy-mode cluster \
--executor-memory 2G \
--num-executors 10 \
/home/hadoop/work/sparkdemo_2.10-1.0.jar \
data.txt \
output3
执行日志:
16/05/27 19:26:52 INFO impl.TimelineClientImpl: Timeline service address: http://10.1.1.8:8188/ws/v1/timeline/
16/05/27 19:26:53 INFO client.RMProxy: Connecting to ResourceManager at /10.1.1.8:8032
16/05/27 19:26:53 INFO yarn.Client: Requesting a new application from cluster with 3 NodeManagers
16/05/27 19:26:53 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
16/05/27 19:26:53 INFO yarn.Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
16/05/27 19:26:53 INFO yarn.Client: Setting up container launch context for our AM
16/05/27 19:26:53 INFO yarn.Client: Setting up the launch environment for our AM container
16/05/27 19:26:53 INFO yarn.Client: Preparing resources for our AM container
16/05/27 19:26:53 INFO yarn.Client: Uploading resource file:/home/hadoop/caishi/local/spark-1.6.0/lib/spark-assembly-1.6.0-hadoop2.6.0.jar -> hdfs://10.1.1.8:9000/user/hadoop/.sparkStaging/application_1461847360200_1033/spark-assembly-1.6.0-hadoop2.6.0.jar
16/05/27 19:26:55 INFO yarn.Client: Uploading resource file:/home/hadoop/work/sparkdemo_2.10-1.0.jar -> hdfs://10.1.1.8:9000/user/hadoop/.sparkStaging/application_1461847360200_1033/sparkdemo_2.10-1.0.jar
16/05/27 19:26:55 INFO yarn.Client: Uploading resource file:/tmp/spark-c3b21f91-35b7-4153-a508-928db0b6ece8/__spark_conf__4278404195288929205.zip -> hdfs://10.1.1.8:9000/user/hadoop/.sparkStaging/application_1461847360200_1033/__spark_conf__4278404195288929205.zip
16/05/27 19:26:55 INFO spark.SecurityManager: Changing view acls to: hadoop
16/05/27 19:26:55 INFO spark.SecurityManager: Changing modify acls to: hadoop
16/05/27 19:26:55 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
16/05/27 19:26:55 INFO yarn.Client: Submitting application 1033 to ResourceManager
16/05/27 19:26:55 INFO impl.YarnClientImpl: Submitted application application_1461847360200_1033
16/05/27 19:26:56 INFO yarn.Client: Application report for application_1461847360200_1033 (state: ACCEPTED)
16/05/27 19:26:56 INFO yarn.Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1464348415888
final status: UNDEFINED
tracking URL: http://10-1-1-8:8088/proxy/application_1461847360200_1033/
user: hadoop
16/05/27 19:26:57 INFO yarn.Client: Application report for application_1461847360200_1033 (state: ACCEPTED)
16/05/27 19:26:58 INFO yarn.Client: Application report for application_1461847360200_1033 (state: ACCEPTED)
16/05/27 19:26:59 INFO yarn.Client: Application report for application_1461847360200_1033 (state: ACCEPTED)
16/05/27 19:27:00 INFO yarn.Client: Application report for application_1461847360200_1033 (state: ACCEPTED)
16/05/27 19:27:01 INFO yarn.Client: Application report for application_1461847360200_1033 (state: ACCEPTED)
16/05/27 19:27:02 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)
16/05/27 19:27:02 INFO yarn.Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 10.1.1.10
ApplicationMaster RPC port: 0
queue: default
start time: 1464348415888
final status: UNDEFINED
tracking URL: http://10-1-1-8:8088/proxy/application_1461847360200_1033/
user: hadoop
16/05/27 19:27:03 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)
16/05/27 19:27:04 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)
16/05/27 19:27:05 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)
16/05/27 19:27:06 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)
16/05/27 19:27:07 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)
16/05/27 19:27:08 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)
16/05/27 19:27:09 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)
16/05/27 19:27:11 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)
16/05/27 19:27:12 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)
16/05/27 19:27:13 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)
16/05/27 19:27:14 INFO yarn.Client: Application report for application_1461847360200_1033 (state: FINISHED)
16/05/27 19:27:14 INFO yarn.Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 10.1.1.10
ApplicationMaster RPC port: 0
queue: default
start time: 1464348415888
final status: SUCCEEDED
tracking URL: http://10-1-1-8:8088/proxy/application_1461847360200_1033/
user: hadoop
16/05/27 19:27:14 INFO util.ShutdownHookManager: Shutdown hook called
16/05/27 19:27:14 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-c3b21f91-35b7-4153-a508-928db0b6ece8
运行结果:
[hadoop@10-1-1-8 spark-1.6.0]$ hdfs dfs -cat output3/*
(hello,2)
(you,1)
(wo,1)
(helo,1)
(me,1)
Yarn集群如图所示(注意:spark集群无任务,因为是通过yarn来运行的):
4. RDD操作
RDD提供了一个抽象的分布式数据架构,分成两个操作:转换、执行
转换:指该操作从已经存在的数据集上创建一个新的数据集,是数据集的逻辑操作,并没有真正计算。
执行:指该方法提交一个与前一个Action之间的所有转换组成的Job进行计算,Spark会根据Action将作业切分成多个Job。
4.1. 转换操作
4.1.1. 基础转化操作
转换 |
说明 |
map(func) |
数据集中的每条元素经过func函数转换后形成一个新的分布式数据集 |
filter(func) |
过滤作用,返回数据true的元素返回 |
flatMap(func) |
对每个输入项可以映射到0个或者更多的输出项 |
Repartition(numPartions) |
在RDD上随机重洗数据,这个操作冲洗网络上所有数据 |
4.1.2. 键-值转换操作
转换 |
说明 |
groupByKey |
key进行分组,返回一个(K,Iterable<V>)对的数据集 |
reduceByKey(func,[numTasks) |
key进行分组,使用给定的func集合values数值,返回(K,V)数据集,其中func的数据类型必须是(V,V)=>V。 |
sortByKey |
返回以key排序的(K,V)数据集 |
join |
类型(K,V)和(K,W)的数据集合合并成一个(K,(V,W)) |
4.2. 执行操作
4.2.1. 常用执行操作
转换 |
说明 |
reduce(func) |
通过函数func聚集数据集中的所有元素,func函数接收两个参数,返回一个数值 |
collect() |
返回数据集的所有元素到Driver程序 |
count() |
返回数据集的个数元素 |
first() |
返回数据集的第一个元素 |
take(n) |
以数据的形式,返回数据集上的前n个元素 |
foreach(func) |
在数据集的每个元素上运行func。 |
4.2.2. 存储执行操作
转换 |
说明 |
saveAsTextFile(path) |
将数据集元素作为一个文本文件保存于本地文件系统中的给定目录 |
saveAsSequenceFile(path) |
以sequenceFile格式保存到指定路径 |
saveAsObjectFile(path) |
java序列化将数据集的元素写入到一个简单的格式中 |
5. 本章小结
主要介绍IDEA工具以及SBT开发Spark程序,同时介绍Spark WordCount程序作为入门程序,Spark的打包、部署方式,以及RDD的一些基本操作。