001-使用IDEA 工具创建Spark项目

时间:2022-01-14 01:19:55


1. 环境准备

Hadoop 2.7.1

Spark1.6.0

Scala2.10.5

JDK1.7

Idea 15.0.2

2. 创建第一个SBT项目

2.1. File->Project

(1)选择Scala->SBT->Next

 

001-使用IDEA 工具创建Spark项目

 

(2)在Project name中输入项目名称,例如:sparkdemo,然后点击“Finished”

 

001-使用IDEA 工具创建Spark项目

 

2.2. SBT项目工程结构

 

001-使用IDEA 工具创建Spark项目

 

2.3. SBT项目配置依赖

通过SBT生成的项目会生成一个build.sbt文件(类似于maven的pom.xml),在这里添加用户需要的依赖包。如:

 

001-使用IDEA 工具创建Spark项目

 

3. Spark编写WordCount程序

3.1. 引入依赖包

在项目sparkdemo根目录下文件build.sbt增加下面内容:

name := "sparkdemo"
 
version := "1.0"
 
scalaVersion := "2.10.5"
 
libraryDependencies += "org.apache.spark" % "spark-core_2.10" %"1.6.0"
 
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0"
 
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
 
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.0"
 
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"


 

3.2. 编写WordCount程序

选中工程->右键->New->Scala Class->选择“object”,然后就创建scala扩展名称的文件,然后编写自己的spark程序即可。


package com.ykinfo.demo
 
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
 
/**
  * Created by fuli.shen on 2016/5/25.
  */
object WordCount {
 
  def main(args: Array[String]) {
    if(args.length != 2){
      println("Usage:<sourcePath> <inputPath> <outputPath>")
      sys.exit
    }
    // 0-初始化
    val config:SparkConf = new SparkConf()
        .setAppName("WordCount")
    val sc: SparkContext = new SparkContext(config)
    // 1-读取hdfs数据
    val textFile: RDD[String] = sc.textFile(args(0))
 
    // 2-转换成所有单词数组
    val flatMap: RDD[String] = textFile.flatMap(x=>x.split("\t"))
    // 3-每个单词转为(word,1),并按照该key进行进行求和
    val shuffleRDD: RDD[(String, Int)] = flatMap.map(word=>(word,1)).reduceByKey((a, b)=>a+b)
    // 4-按照每个单词出现的次数降序排序
    val mapRDD: RDD[(String, Int)] = shuffleRDD.map(tuple=>(tuple._2,tuple._1)).sortByKey(false).map(tuple=>(tuple._2,tuple._1))
    // 5-计算结果保存
    mapRDD.saveAsTextFile(args(1))
  }
}


3.3. 打包部署

本项目采用SBT工具方式,可以采用SBT命令相关工程的打包部署,例如:我在window环境下操作,具体命令:

D:\work\sparkdemo>sbt clean compile package

 

最后会生成一个打包后的文件,例如:

[info] Packaging D:\work\sparkdemo\target\scala-2.10\sparkdemo_2.10-1.0.jar

 

那么,我们就可以直接sparkdemo_2.10-1.0.jar 上传到Spark集群上,然后运行了。

3.4. Spark 程序运行

3.4.1. Run application locally on 2 cores

 ./bin/spark-submit \

--class com.ykinfo.demo.WordCount \

--master local[2] \

/home/hadoop/work/sparkdemo_2.10-1.0.jar \

data.txt \

output1

 

注:local[2] 表示通过2个线程执行任务

3.4.2. Run on a Spark standalone cluster in client deploy mode

./bin/spark-submit \

--class com.ykinfo.demo.WordCount \

--master spark://10.1.1.8:7077 \

--executor-memory 2G \

--total-executor-cores 10  \

/home/hadoop/work/sparkdemo_2.10-1.0.jar  \

data.txt output2

 

运行结果:

[hadoop@10-1-1-8 spark-1.6.0]$ hdfs dfs -cat output2/*

(hello,2)

(you,1)

(wo,1)

(helo,1)

(me,1)

 

执行完上述命令后,我们查看一下spark集群的状况,如图所示:

 

 

001-使用IDEA 工具创建Spark项目

  

3.4.3. Run on a YARN cluster

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

./bin/spark-submit \

  --class com.ykinfo.demo.WordCount \

  --master yarn \

  --deploy-mode cluster \

  --executor-memory 2G \

  --num-executors 10 \

 /home/hadoop/work/sparkdemo_2.10-1.0.jar  \

 data.txt  \

 output3

  

执行日志:

16/05/27 19:26:52 INFO impl.TimelineClientImpl: Timeline service address: http://10.1.1.8:8188/ws/v1/timeline/

16/05/27 19:26:53 INFO client.RMProxy: Connecting to ResourceManager at /10.1.1.8:8032

16/05/27 19:26:53 INFO yarn.Client: Requesting a new application from cluster with 3 NodeManagers

16/05/27 19:26:53 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)

16/05/27 19:26:53 INFO yarn.Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead

16/05/27 19:26:53 INFO yarn.Client: Setting up container launch context for our AM

16/05/27 19:26:53 INFO yarn.Client: Setting up the launch environment for our AM container

16/05/27 19:26:53 INFO yarn.Client: Preparing resources for our AM container

16/05/27 19:26:53 INFO yarn.Client: Uploading resource file:/home/hadoop/caishi/local/spark-1.6.0/lib/spark-assembly-1.6.0-hadoop2.6.0.jar -> hdfs://10.1.1.8:9000/user/hadoop/.sparkStaging/application_1461847360200_1033/spark-assembly-1.6.0-hadoop2.6.0.jar

16/05/27 19:26:55 INFO yarn.Client: Uploading resource file:/home/hadoop/work/sparkdemo_2.10-1.0.jar -> hdfs://10.1.1.8:9000/user/hadoop/.sparkStaging/application_1461847360200_1033/sparkdemo_2.10-1.0.jar

16/05/27 19:26:55 INFO yarn.Client: Uploading resource file:/tmp/spark-c3b21f91-35b7-4153-a508-928db0b6ece8/__spark_conf__4278404195288929205.zip -> hdfs://10.1.1.8:9000/user/hadoop/.sparkStaging/application_1461847360200_1033/__spark_conf__4278404195288929205.zip

16/05/27 19:26:55 INFO spark.SecurityManager: Changing view acls to: hadoop

16/05/27 19:26:55 INFO spark.SecurityManager: Changing modify acls to: hadoop

16/05/27 19:26:55 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)

16/05/27 19:26:55 INFO yarn.Client: Submitting application 1033 to ResourceManager

16/05/27 19:26:55 INFO impl.YarnClientImpl: Submitted application application_1461847360200_1033

16/05/27 19:26:56 INFO yarn.Client: Application report for application_1461847360200_1033 (state: ACCEPTED)

16/05/27 19:26:56 INFO yarn.Client:

         client token: N/A

         diagnostics: N/A

         ApplicationMaster host: N/A

         ApplicationMaster RPC port: -1

         queue: default

         start time: 1464348415888

         final status: UNDEFINED

         tracking URL: http://10-1-1-8:8088/proxy/application_1461847360200_1033/

         user: hadoop

16/05/27 19:26:57 INFO yarn.Client: Application report for application_1461847360200_1033 (state: ACCEPTED)

16/05/27 19:26:58 INFO yarn.Client: Application report for application_1461847360200_1033 (state: ACCEPTED)

16/05/27 19:26:59 INFO yarn.Client: Application report for application_1461847360200_1033 (state: ACCEPTED)

16/05/27 19:27:00 INFO yarn.Client: Application report for application_1461847360200_1033 (state: ACCEPTED)

16/05/27 19:27:01 INFO yarn.Client: Application report for application_1461847360200_1033 (state: ACCEPTED)

16/05/27 19:27:02 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)

16/05/27 19:27:02 INFO yarn.Client:

         client token: N/A

         diagnostics: N/A

         ApplicationMaster host: 10.1.1.10

         ApplicationMaster RPC port: 0

         queue: default

         start time: 1464348415888

         final status: UNDEFINED

         tracking URL: http://10-1-1-8:8088/proxy/application_1461847360200_1033/

         user: hadoop

16/05/27 19:27:03 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)

16/05/27 19:27:04 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)

16/05/27 19:27:05 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)

16/05/27 19:27:06 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)

16/05/27 19:27:07 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)

16/05/27 19:27:08 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)

16/05/27 19:27:09 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)

16/05/27 19:27:11 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)

16/05/27 19:27:12 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)

16/05/27 19:27:13 INFO yarn.Client: Application report for application_1461847360200_1033 (state: RUNNING)

16/05/27 19:27:14 INFO yarn.Client: Application report for application_1461847360200_1033 (state: FINISHED)

16/05/27 19:27:14 INFO yarn.Client:

         client token: N/A

         diagnostics: N/A

         ApplicationMaster host: 10.1.1.10

         ApplicationMaster RPC port: 0

         queue: default

         start time: 1464348415888

         final status: SUCCEEDED

         tracking URL: http://10-1-1-8:8088/proxy/application_1461847360200_1033/

         user: hadoop

16/05/27 19:27:14 INFO util.ShutdownHookManager: Shutdown hook called

16/05/27 19:27:14 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-c3b21f91-35b7-4153-a508-928db0b6ece8

  

运行结果:

[hadoop@10-1-1-8 spark-1.6.0]$ hdfs dfs -cat output3/*

(hello,2)

(you,1)

(wo,1)

(helo,1)

(me,1)

  

Yarn集群如图所示(注意:spark集群无任务,因为是通过yarn来运行的):

 

001-使用IDEA 工具创建Spark项目

4. RDD操作

RDD提供了一个抽象的分布式数据架构,分成两个操作:转换、执行

 

转换:指该操作从已经存在的数据集上创建一个新的数据集,是数据集的逻辑操作,并没有真正计算。

 

执行:指该方法提交一个与前一个Action之间的所有转换组成的Job进行计算,Spark会根据Action将作业切分成多个Job。

 

4.1. 转换操作

4.1.1. 基础转化操作

转换

说明

map(func)

数据集中的每条元素经过func函数转换后形成一个新的分布式数据集

filter(func)

过滤作用,返回数据true的元素返回

flatMap(func)

对每个输入项可以映射到0个或者更多的输出项

Repartition(numPartions)

在RDD上随机重洗数据,这个操作冲洗网络上所有数据

 

4.1.2. 键-值转换操作

转换

说明

groupByKey

key进行分组,返回一个(K,Iterable<V>)对的数据集

reduceByKey(func,[numTasks)

key进行分组,使用给定的func集合values数值,返回(K,V)数据集,其中func的数据类型必须是(V,V)=>V。

sortByKey

返回以key排序的(K,V)数据集

join

类型(K,V)和(K,W)的数据集合合并成一个(K,(V,W))

 

4.2. 执行操作

4.2.1. 常用执行操作

转换

说明

reduce(func)

通过函数func聚集数据集中的所有元素,func函数接收两个参数,返回一个数值

collect()

返回数据集的所有元素到Driver程序

count()

返回数据集的个数元素

first()

返回数据集的第一个元素

take(n)

以数据的形式,返回数据集上的前n个元素

foreach(func)

在数据集的每个元素上运行func。

 

4.2.2. 存储执行操作

转换

说明

saveAsTextFile(path)

将数据集元素作为一个文本文件保存于本地文件系统中的给定目录

saveAsSequenceFile(path)

以sequenceFile格式保存到指定路径

saveAsObjectFile(path)

java序列化将数据集的元素写入到一个简单的格式中

 

 

5. 本章小结

 

主要介绍IDEA工具以及SBT开发Spark程序,同时介绍Spark WordCount程序作为入门程序,Spark的打包、部署方式,以及RDD的一些基本操作。