1、需求描述
本次实验需要:系统:linux unbuntu14.04,处理器:至少需要两个处器,一个内核,内存:至少4G,硬盘空间:大小需要20GB。Hadoop:2.7.1以上版本,JDK:1.8以上版本,Spark:2.4.0以上版本,Python:3.6以上版本。
1、根据的数据分析某大学计算机系的成绩
(1)该系总共有多少学生;
(2)该系共开设了多少门课程;
(3)Tom同学的总成绩平均分是多少;
(4)求每名同学的选修的课程门数;
(5)该系DataBase课程共有多少人选修;
(6)各门课程的平均分是多少;
(7)使用累加器计算共有多少人选了DataBase这门课。
2、编写独立应用程序实现数据去重
对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C
3、编写独立应用程序实现求平均值问题
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中
2、环境介绍
环境准备:
- Hadoop下载与安装
- /share/init?surl=mUR3M2U_lbdBzyV_p85eSA
(提取码:99bg)进入该百度云盘链接后,找到Hadoop安装文件hadoop-2.7.。
2、下载完后还需要配置必备工作才能安装hadoop。
(1)首先创建Hadoop用户,sudo useradd -m hadoop -s /bin/bash
(2)设置Hadoop用户密码,sudo passwd Hadoop
(3)为Hadoop用户增加管理员权限,sudo adduser hadoop sudo
(4)使用Hadoop用户登录后需要更新apt,sudo apt-get update
(5)安装vim, sudo apt-get install vim
(6)安装JAVA环境
sudo apt-get install openjdk-7-jre openjdk-7-jdk
(7) 安装好 OpenJDK 后,需要找到相应的安装路径,这个路径是用于配置 JAVA_HOME 环境变量的。
dpkg -L openjdk-7-jdk | grep '/bin/javac'
(8) 接着需要配置一下 JAVA_HOME 环境变量,为方便,我们在 ~/.bashrc 中进行设置, sudo vim ~/.bashrc
(9) 在文件最前面添加如下单独一行(注意 = 号前后不能有空格),将“JDK安装路径”改为上述命令得到的路径,并保存:
(10)刷新环境变量,source ~/.bashrc
(11)安装Hadoop,我们选择将 Hadoop 安装至 /usr/local/ 中
sudo tar -zxf ~/下载/hadoop-2.6. -C /usr/local
# 解压到/usr/local中
cd /usr/local/
sudo mv ./hadoop-2.6.0/ ./hadoop # 将文件夹名改为hadoop
sudo chown -R hadoop ./hadoop # 修改文件权限
(12) Hadoop 解压后即可使用。输入如下命令来检查 Hadoop 是否可用,成功则会显示 Hadoop 版本信息:
cd /usr/local/hadoop
./bin/hadoop version
- Spark的下载安装
(1)Spark官方下载地址:/
(2) 这里是Local模式(单机模式)的 Spark安装。我们选择Spark 1.6.2版本,并且假设当前使用用户名hadoop登录了Linux操作系统。
sudo tar -zxf ~/下载/spark-1.6. -C /usr/local/ #解压
cd /usr/local
sudo mv ./spark-1.6.2-bin-without-hadoop/ ./spark #移动文件
sudo chown -R hadoop:hadoop ./spark #此处的hadoop 为你的用户名授权
(3) 安装后,还需要修改Spark的配置文件
cd /usr/local/spark
cp ./conf/ ./conf/
(4) 编辑文件(vim ./conf/),在第一行添加以下配置信息:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
有了上面的配置信息以后,Spark就可以把数据存储到Hadoop分布式文件系统HDFS中,也可以从HDFS中读取数据。如果没有配置上面信息,Spark就只能读写本地数据,无法读写HDFS数据。配置完成后就可以直接使用,不需要像Hadoop运行启动命令。
3、数据来源描述
数据来源于期末大作业素材,分别是,,,,,六个文件,由于学校系统不支持双向复制粘贴(支持双向复制粘贴跳过),所以在windows系统中使用FileZilla软件传输到virtualbox虚拟机上,具体步骤如下:
- 在虚拟机网络设置为桥接模式,并打开虚拟机终端输入ifconfig,查看本地ip并复制,如无请刷新网络重试。
- 打开FileZilla,打开文件,站点管理器,新建站点,主机输入刚刚复制的ip地址,用户名输入hadoop,密码输入你虚拟机hadoop用户下的密码,点击连接。
连接成功便可以传输文件了。
4、数据上传及上传结果查看
5、数据处理过程描述
pyspark交互式编程
1、该系总共有多少学生:
-
lines=(“file:///usr/local/spark/sparksqldata/data.txt”)//获取data.txt文件
-
res = lines.map(lambda x:(“,”)).map(lambda x: x[0]) //获取每行数据的第1列
-
sum = ()// distinct去重
-
sum.count()//取元素总个数265
2、该系共开设了多少门课程;
-
lines = (“file:///usr/local/spark/sparksqldata/data.txt”) //获取data.txt文件
-
res = lines.map(lambda x:(“,”)).map(lambda x:x[1]) //获取每行数据的第2列
-
dis_res = ()//distinct去重
-
dis_res.count()//取元素总个数8
3、Tom同学的总成绩平均分是多少;
-
lines=(“file:///usr/local/spark/sparksqldata/data.txt”) //获取data.txt文件
-
res = lines.map(lambda x:(“,”)).filter(lambda x:x[0]==”Tom”) //筛选出Tom同学的成绩信息
-
(print)//循环输出
-
score = (lambda x:int(x[2]))//提取Tom同学的每门成绩,并转换为int类型
-
num = res.count() //Tom同学选课门数
-
sum_score = (lambda x,y:x+y) //Tom同学的总成绩
-
avg = sum_score/num // 总成绩/门数=平均分
-
print(avg)//输出平均分
4、求每名同学的选修的课程门数;
-
lines=(“file:///usr/local/spark/sparksqldata/data.txt”) //获取data.txt文件
-
res = lines.map(lambda x:(“,”)).map(lambda x:(x[0],1)) //学生每门课程都对应(学生姓名,1),学生有n门课程则有n个(学生姓名,1)
-
each_res = (lambda x,y: x+y) //按学生姓名获取每个学生的选课总数
-
each_res.foreach(print)//循环输出
5、该系DataBase课程共有多少人选修;
-
lines=(“file:///usr/local/spark/sparksqldata/data.txt”) //获取data.txt文件
-
res=lines.map(lambdax:(“,”)).filter(lambda x:x[1]==”DataBase”)
-
res.count()//使用count统计
6、各门课程的平均分是多少;
-
lines=(“file:///usr/local/spark/sparksqldata/data.txt”) //获取Data.txt文件
-
res=lines.map(lambdax:(“,”)).map(lambdax:(x[1],(int(x[2]),1))) //为每门课程的分数后面新增一列1,表示1个学生选择了该课程。
-
temp = (lambda x,y:(x[0]+y[0],x[1]+y[1])) //按课程名聚合课程总分和选课人数。格式如(‘ComputerNetwork’, (7370, 142))
-
avg = (lambda x:(x[0], round(x[1][0]/x[1][1],2)))//课程总分/选课人数 = 平均分,并利用round(x,2)保留两位小数
-
(print)//循环输出
7、使用累加器计算共有多少人选了DataBase这门课。
-
lines=(“file:///usr/local/spark/sparksqldata/data.txt”) //获取data.txt文件
-
res=lines.map(lambdax:(“,”)).filter(lambda x:x[1]==”DataBase”)//筛选出选了DataBase课程的数据
-
accum = (0) //定义一个从0开始的累加器accum
-
(lambda x:accum.add(1))//遍历res,每扫描一条数据,累加器加1
-
accum.value //输出累加器的最终值1764
编写独立应用程序实现数据去重
1、 导入SparkContext包
2、 初始化SparkContext
3、 加载两个文件A和B
4、 使用union合并两个文件的内容
5、 使用distinct去重操作
6、 使用sortBy排序操作
7、 将结果写入result文件中,repartition(1)的作用是让结果合并到一个文件中,不加的话会结果写入到两个文件
-
from pyspark import SparkContext
-
sc=SparkContext('local','sparksqldata')
-
lines1 = ("file:///usr/local/spark/sparksqldata/")
-
lines2 = ("file:///usr/local/spark/sparksqldata/")
-
lines = lines1.union(lines2)
-
dis_lines=lines.distinct()
-
res = dis_lines.sortBy(lambda x:x)
-
(1).saveAsTextFile("file:///usr/local/spark/sparksqldata/result")
总共500行数据,截图截至第一页。
编写独立应用程序实现求平均值问题
-
from pyspark import SparkContext
-
sc = SparkContext("local","sparksqldata")
-
lines1 = ("file:///usr/local/spark/sparksqldata/")
-
lines2 = ("file:///usr/local/spark/sparksqldata/")
-
lines3 = ("file:///usr/local/spark/sparksqldata/")
-
lines = lines1.union(lines2).union(lines3)
-
data = lines.map(lambda x:(" ")).map(lambda x:(x[0],(int(x[1]),1)))
-
res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
-
data = (lambda x:(x[0],round(x[1][0]/x[1][1],2)))
-
data.repartition(1).saveAsTextFile("file:///usr/local/spark/sparksqldata/result1")
6、经验总结
Spark是一个基于内存的快速的用于大规模数据处理的统一分析引擎。Spark有容错、并行的特性。spark发展迅猛,框架比hadoop更加灵活实用。减少了延时处理,提高性能效率实用灵活性,也可以与hadoop切实相互结合。RDD(弹性分布式数据集)是Spark的核心数据模型,也是一个抽象的元素集合,包含有数据。弹性体现在RDD的数据默认情况下是存储在内存中的,如果内存中存储不下,spark会自动将RDD中的数据写入到磁盘中。
经过这次期末大作业加深了对pyspark的印象,实验中使用编程了计算数据,首先先创建RDD,然后使用Map方法拆分每行记录,取出每行的第某个元素,然后使用方法实现结果。count方法来计算总个数,distinct方法去除重复数据,round方法保留小数等等。还有许多方法今后仍需继续学习,能够达到灵活运用的程度。
在这次大作业中发现,对spark和RDD编程还有许多不足,对RDD的理解不够深刻,对代码实际运用还有很多不足的地方,在今后的学习中,仍需要努力学习。
参考文献
[1]Hadoop3.1.3安装教程_单机/伪分布式配置_Hadoop3.1.3/Ubuntu18.04(16.04)_厦大数据库实验室博客[J/OL]. /blog/2441-2/.
[2] Spark安装和编程实践(Spark2.4.0)[J/OL]. /blog/2501-2/