第1关:集合并行化创建RDD
任务描述
本关任务:计算并输出各个学生的总成绩。
相关知识
为了完成本关任务,你需要掌握:1.集合并行化创建RDD,2.reduceByKey算子、foreach算子
集合创建RDD
Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。相当于是,集合中的部分数据会到一个节点上,而另一部分数据会到其他节点上。然后就可以用并行的方式来操作这个分布式数据集合,即RDD。
val list=List(1,2,3,4,5)
val stu=sc.parallelize(list,3)//参数1:Seq集合,必须。参数2:分区数,默认为该Application分配到的资源的CPU核数
//val stu=sc.makeRDD(list,3) //和parallelize用法一样。(该用法可以指定每一个分区的preferredLocations)。
val sum=stu.reduce(_+_)
println(sum)
输出:15
reduceByKey()
对元素为RDD[K,V]对的RDD中Key相同的元素的Value进行聚合
val list = List(("spark",2),("hive",1),("hive",2))
val stuRDD = sc.makeRDD(list)
stuRDD.reduceByKey(_+_)
输出:
(spark,2)
(hive,3)
foreach() :对数据集中每一个元素运行函数
val list = List(1,2,3,4)
val stuRDD = sc.makeRDD(list)
stuRDD.foreach(print)//对这个RDD进行遍历输出
输出:1234
编程要求
根据提示,在右侧编辑器begin-end处补充代码,计算并输出各个学生的总成绩。
("bj",88): bj指学生姓名,88指学生成绩。
测试说明
平台会对你编写的代码进行测试:
预期输出:
(bj,254)
(sh,221)
(gz,285)
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object Student {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local")
val sc = new SparkContext(conf)
val arr1=Array(("bj",88),("sh",67),("gz",92))
val arr2=Array(("bj",94),("sh",85),("gz",95))
val arr3=Array(("bj",72),("sh",69),("gz",98))
/********** begin **********/
// 第一步:先将数组进行合并
val combinedArr = arr1 ++ arr2 ++ arr3
// 第二步:创建RDD
val rdd = sc.parallelize(combinedArr)
// 第三步:把相同key的进行聚合
val totalScores = rdd.reduceByKey(_ + _)
// 第四步:输出
totalScores.collect().foreach(println)
/********** end **********/
sc.stop()
}
}
这段代码首先创建了一个配置为本地模式的 SparkConf 和 SparkContext。然后,它定义了三个包含学生姓名和成绩的数组。在 begin 和 end 之间,代码执行了以下步骤:
使用 ++ 运算符将三个数组合并成一个数组。
使用 sc.parallelize(combinedArr) 将合并后的数组并行化,创建一个 RDD。
使用 reduceByKey(_ + _) 对 RDD 中的相同键(学生姓名)的值(成绩)进行聚合,得到每个学生的总成绩。
使用 collect().foreach(println) 方法将分布式数据集中的结果收集到驱动程序中,并遍历打印每个学生的总成绩。
第2关:读取外部数据集创建RDD
任务描述
本关任务:读取文本文件,按照文本中数据,输出老师及其出现次数。
相关知识
为了完成本关任务,你需要掌握:1.读取文件创建RDD
,2.本关所需算子。
读取文件
textFile()
val rdd = sc.textFile("/home/student.txt")//文件路径
算子
(1)map:对RDD
中的每个元素都执行一个指定的函数来产生一个新的RDD
。
scala> val numbers=sc.makeRDD(List(1,2,3,4))
scala> numbers.map((i: Int) => i * 2)
res0: List[Int] = List(2, 4, 6, 8)
(2) reduceByKey() :对元素为RDD[K,V]
对的RDD
中Key
相同的元素的Value
进行聚合。
val list = List(("spark",2),("hive",1),("hive",2))
val stuRDD = sc.makeRDD(list)
stuRDD.reduceByKey(_+_).foreach(println)
输出: (spark,2)
(hive,3)
(3)sortBy() :对RDD
中的元素进行排序。
val list = List(("spark",2),("hive",1),("hadoop",4))
val stuRDD = sc.makeRDD(list)
stuRDD.sortBy(_._2,false,3)//参数1:函数,排序规则有这个函数决定(必填) 参数2:升降序(true为升序,false为降序) 参数3:分区数量
编程要求
根据提示,在右侧编辑器begin-end
处补充代码,按每位老师出现的次数降序,输出老师姓名和出现次数。
- 输入文件样例:
bigdata,laozhang
bigdata,laoduan
javaee,xiaoxu
bigdata
指科目,laozhang
指老师名称。
测试说明
平台会对你编写的代码进行测试:
预期输出: (laozhao,15)
(laoyang,9)
(laoduan,5)
(xiaoxu,4)
(laoli,3)
(laozhang,2)
(laoliu,1)
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object Teachers {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Teachers").setMaster("local")
val sc = new SparkContext(conf)
val dataFile = "file:///root/step2_files"
/********** begin **********/
// 第一步:以外部文件方式创建RDD
val rdd = sc.textFile(dataFile)
// 第二步:将文件中每行的数据切分,得到自己想要的返回值
val teacherRdd = rdd.map(line => (line.split(",")(1), 1))
// 第三步:将相同的key进行聚合
val aggregatedRdd = teacherRdd.reduceByKey(_ + _)
// 第四步:按出现次数进行降序
val sortedRdd = aggregatedRdd.sortBy(_._2, false)
// 第五步:输出
sortedRdd.collect().foreach(println)
/********** end **********/
sc.stop()
}
}
这段代码首先创建了一个配置为本地模式的 SparkConf 和 SparkContext。然后,它定义了要读取的文本文件路径。在 begin 和 end 之间,代码执行了以下步骤:
使用 sc.textFile(dataFile) 从外部文件创建一个 RDD。
使用 map 函数将每行数据切分为老师名称和科目,并创建一个由老师名称和出现次数(初始为1)组成的元组 RDD。
使用 reduceByKey 函数对 RDD 中的相同老师名称的记录进行聚合,得到每位老师的总出现次数。
使用 sortBy 函数按照出现次数进行降序排序。
使用 collect().foreach(println) 方法将排序后的结果收集到驱动程序中,并遍历打印每位老师的姓名和出现次数。