1.spark-shell 交互式编程
(1)该系总共有多少学生;
执行命令:
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var par=tests.map(row=>row.split(",")(0))
var distinct_par=par.distinct()
distinct_par.count
结果:
(2)该系共开设来多少门课程;
执行命令:
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var par=tests.map(row=>row.split(",")(1))
var distinct_par=par.distinct()
distinct_par.count
结果:
(3)Tom 同学的总成绩平均分是多少;
执行命令:
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var pars=tests.filter(row=>row.split(",")(0)=="Tom")
pars.foreach(println)
结果:
(4)求每名同学的选修的课程门数;
执行命令:
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var pars=tests.map(row=>(row.split(",")(0),row.split(",")(1)))
pars.mapValues(x=>(x,1)).reduceByKey((x,y)=>(" ",x._2+y._2)).mapValues(x=>x._2).foreach(println)
结果(此处仅为部分结果,结果共265项):
(5)该系 DataBase 课程共有多少人选修;
执行命令(结果最后一行):
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var pars=tests.filter(row=>(row.split(",")(1)=="Database"))
pars.count
(6)各门课程的平均分是多少;
执行命令:
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var pars=tests.map(row=>(row.split(",")(1),row.split(",")(2).toInt))
pars.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()
结果:
(7)使用累加器计算共有多少人选了 DataBase 这门课。
执行命令:
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var pars=tests.filter(row=>(row.split(",")(1)=="Database")).map(row=>(row.split(",")(1),1))
var account=sc.longAccumulator("My Accumulator")
pars.values.foreach(x=>account.add(x))
结果:
2.编写独立应用程序实现数据去重
对于两个输入文件 A 和 B,编写 Spark 独立应用程序,对两个文件进行合并,并剔除其 中重复的内容,得到一个新文件 C。下面是输入文件和输出文件的一个样例,供参考。 输入文件 A 的样例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z
输入文件 B 的样例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根据输入的文件 A 和 B 合并得到的输出文件 C 的样例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z
创建项目:
remdup.scala
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner object RemDup { def main(args: Array[String]) { val conf = new SparkConf().setAppName("RemDup") val sc = new SparkContext(conf) val A = sc.textFile("file:///home/hadoop/studata/A.txt") val B = sc.textFile("file:///home/hadoop/studata/B.txt") val C = A.union(B).distinct().sortBy(x => x,true) C.foreach(println) sc.stop() } }
simple.sbt
name := "RemDup Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
打包项目(sbt的安装请看Spark-寒假-实验3):
运行jar包:
运行结果:
3.编写独立应用程序实现求平均值问题
创建项目流程同上:
程序代码如下:
average.scala
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner object Average { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Average") val sc = new SparkContext(conf) val Algorimm = sc.textFile("file:///home/hadoop/studata/Algorimm.txt") val DataBase = sc.textFile("file:///home/hadoop/studata/DataBase.txt") val Python = sc.textFile("file:///home/hadoop/studata/Python.txt") val allGradeAverage = Algorimm.union(DataBase).union(Python) val stuArrayKeyValue = allGradeAverage.map(x=>(x.split(" ")(0),x.split(" ")(1).toDouble)).mapValues(x=>(x,1)) val totalGrade = stuArrayKeyValue.reduceByKey((x,y) => (x._1+y._1,x._2+y._2)) val averageGrade = totalGrade.mapValues(x=>(x._1.toDouble/x._2.toDouble).formatted("%.2f")).foreach(println) sc.stop() } }
simple.sbt
name := "Average Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
打包项目:
运行jar包:
运行结果: