Spark RDD编程-大数据课设

时间:2024-03-03 12:30:34

一、实验目的

1、熟悉Spark的RDD基本操作及键值对操作;
2、熟悉使用RDD编程解决实际具体问题的方法。

二、实验平台

操作系统:Ubuntu16.04
Spark版本:2.4.0
Python版本:3.4.3

三、实验内容、要求

1.pyspark交互式编程

本作业提供分析数据data.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所示:

Tom,Algorithm,50
Tom,Datastructure,60
Jim,Database,90
Jim,Algorithm,60
Jim,Datastructure,80
……

请根据给定的实验数据,在pyspark中通过编程来计算以下内容:
(1)该系总共有多少学生;
(2)该系共开设了多少门课程;
(3)Tom同学的总成绩平均分是多少;
(4)求每名同学的选修的课程门数;
(5)该系DataBase课程共有多少人选修;
(6)各门课程的平均分是多少;
(7)使用累加器计算共有多少人选了DataBase这门课。

2.编写独立应用程序实现数据去重

对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。本文给出门课的成绩(A.txt、B.txt)下面是输入文件和输出文件的一个样例,供参考。

输入文件A的样例如下:
20200101    x
20200102    y
20200103    x
20200104    y
20200105    z
20200106    z
输入文件B的样例如下:
20200101    y
20200102    y
20200103    x
20200104    z
20200105    y
根据输入的文件A和B合并得到的输出文件C的样例如下:
20200101    x
20200101    y
20200102    y
20200103    x
20200104    y
20200104    z
20200105    y
20200105    z
20200106    z

3.编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。本文给出门课的成绩(Algorithm.txt、Database.txt、Python.txt),下面是输入文件和输出文件的一个样例,供参考。

Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
 (小红,83.67)
 (小新,88.33)
 (小明,89.67)
 (小丽,88.67)

四、实验过程

实验数据准备:
1、将数据文件复制到/usr/local/spark/sparksqldata/目录下

hadoop@dblab-VirtualBox:~$ cd /usr/local/spark/sparksqldata
hadoop@dblab-VirtualBox:/usr/local/spark/sparksqldata$ ls
chapter4-data.txt
hadoop@dblab-VirtualBox:/usr/local/spark/sparksqldata$ cp -r /home/hadoop/桌面/ 大数据/* /usr/local/spark/sparksqldata
hadoop@dblab-VirtualBox:/usr/local/spark/sparksqldata$ ls
Algorithm.txt  B.txt              Database.txt  Python.txt A.txt      
chapter4-data.txt  data.txt

将数据文件放到实验目录下

(一)pyspark交互式编程

1、输入pyspark开启spark

hadoop@dblab-VirtualBox:/usr/local/spark/sparksqldata$ pyspark

开启spark
2、加载data.txt文件

lines = sc.textFile("file:///usr/local/spark/sparksqldata/data.txt")

加载数据
3、查看数据共有多少行

lines.count()
15022

查看数据集行数
4、去重查看数据,以防重复

lines.distinct().count()
1073

去重查看数据集行数
5、通过去重计算数据行数发现与总行数不一致,所以数据文件存在数据内容重复现象,此时要将数据过滤得到一个没有重复的数据集

data = lines.distinct()
data.count()
1073

保存去重后的数据集
6、完成各项需求
(1)该系总共有多少学生;

res = data.map(lambda x:x.split(",")).map(lambda x:x[0])
dis_res = res.distinct()
dis_res.count()
265

计算学生个数
(2)该系共开设了多少门课程;

res = data.map(lambda x:x.split(",")).map(lambda x:x[1])
dis_res = res.distinct()
dis_res.count()
8

在这里插入图片描述
(3)Tom同学的总成绩平均分是多少;

res = data.map(lambda x:x.split(",")).filter(lambda x:x[0]=="Tom")
score = res.map(lambda x:int(x[2]))
curriculum_num = res.count()
score_sum = score.reduce(lambda x,y:x+y)
score_avg = score_sum/curriculum_num 
score_avg
30.8

在这里插入图片描述
(4)求每名同学的选修的课程门数;

res = data.map(lambda x:x.split(",")).map(lambda x:(x[0],1))
res_curriculum = res.reduceByKey(lambda x,y:x+y)
res_curriculum.foreach(print)
res = data.map(lambda x:x.split(",")).map(lambda x:(x[0],1))
curriculum = res.reduceByKey(lambda x,y:x+y)
curriculum.foreach(print)
(\'Lewis\', 56)
(\'Mike\', 42)
(\'Walter\', 56)
(\'Conrad\', 28)
(\'Borg\', 56)
(\'Bert\', 42)
(\'Eli\', 70)
(\'Clare\', 56)
(\'Charles\', 42)
(\'Alston\', 56)
(\'Scott\', 42)
(\'Angelo\', 28)
(\'Christopher\', 56)
(\'Webb\', 98)
(\'Bill\', 28)
(\'Rock\', 84)
(\'Jonathan\', 56)

在这里插入图片描述
(5)该系DataBase课程共有多少人选修;

res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase").map(lambda x:x[0])
dis_res = res.distinct()
dis_res.count()
125

在这里插入图片描述
(6)各门课程的平均分是多少;

res = data.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")
res.count()
126

在这里插入图片描述
(7)使用累加器计算共有多少人选了DataBase这门课。

res = data.map(lambda x:x.split(",")).map(lambda x:(x[1],(int(x[2]),1)))
temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
avg = temp.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
avg.foreach(print)
(\'ComputerNetwork\', 51.9)
(\'Software\', 50.91)
(\'DataBase\', 50.54)
(\'Algorithm\', 48.83)
(\'OperatingSystem\', 54.94)
(\'DataStructure\', 47.57)
(\'Python\', 57.82)
(\'CLanguage\', 50.61)
> res = data.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")
summation = sc.accumulator(0)
res.foreach(lambda x:summation.add(1))
summation.value
126

在这里插入图片描述

(二)编写独立应用程序实现数据去重

假设/usr/local/spark/sparksqldata/目录是当前目录
1.在当前目录下创建一个merge.py的文件,用于编写程序代码
在这里插入图片描述
2.编写程序实现数据去重,并写入新文件
在这里插入图片描述

from pyspark import SparkContext

# 初始化SparkContext
sc = SparkContext("local","sparksqldata")

# 加载文件数据
linesA = sc.textFile("file:///usr/local/spark/sparksqldata/A.txt")
linesB = sc.textFile("file:///usr/local/spark/sparksqldata/B.txt")

# 合并文件数据
linesC = linesA.union(linesB)

# 去重
dis_linesC = linesC.distinct()

# 排序
res = dis_linesC.sortBy(lambda x:x)

# 将得到的数据写入一个新文件
res.repartition(1).saveAsTextFile("file:///usr/local/spark/sparksqldata/result")

3.运行程序代码后会在当前目录下生成一个新文件夹,新文件夹内有重新写入的文件
在这里插入图片描述
4.查看经过去重后新写入的文件内容
在这里插入图片描述

(三)编写独立应用程序实现求平均值问题

假设/usr/local/spark/sparksqldata/目录是当前目录
1.在当前目录下创建一个avgScore.py的文件,用于编写程序代码
在这里插入图片描述
2.编写程序求得各同学得平均成绩
在这里插入图片描述

from pyspark import SparkContext
sc = SparkContext("local","sparksqldata")

# 加载文件数据
lines1 = sc.textFile("file:///usr/local/spark/sparksqldata/Algorithm.txt")
lines2 = sc.textFile("file:///usr/local/spark/sparksqldata/Database.txt")
lines3 = sc.textFile("file:///usr/local/spark/sparksqldata/Python.txt")

# 合并文件
lines = lines1.union(lines2).union(lines3)

# 去除空行
newlines = lines.filter(lambda x:x is not None).filter(lambda x:x is not \'\')

# 拆分
data = newlines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))

# 分组统计
res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))

# 计算每个同学的平均成绩
result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))

3.运行程序后会在当前目录下生成一个新的文件夹,文件夹下存放着程序运行得结果

在这里插入图片描述
在这里插入图片描述

4.查看结果
在这里插入图片描述