快速上手写spark代码系列:03-开始写一个spark小脚本(1)
训练背景设置
上一篇将了RDD操作的各种函数,这一节就把这些函数放在一起做一些事情。先确定一个场景,
- 我们有几类数据,每一类数据结构都不一样,但是有共同的字段,我们需要提取他们共同的字段,合并起来组成一个新的数据集。
- 这个数据集有很多用户,我们要筛选一部分用户。
- 同时,对这些用户,我们要过滤一些别的条件,最终得到一个结果集包含了用户的位置标识
- 然后要把这些用户的位置标识与另外一个数据集进行关联获取经纬度
- 最后把这些结果拼起来,存储10个文件,保存到hdfs目录
好了,我们先构建几个数据
数据1: A类位置数据
数据2: B类位置数据
数据3: C类位置数据
数据4: 待筛选用户列表
数据5: 包含经纬度的位置参数
第一步:准备数据集
A类位置数据(假设文件名为locA,存储在E盘下)
结构与数据如下:
id,name,time,location,type
001,tom,20170401,Loc1,A
001,tom,20170402,Loc1,A
001,tom,20170402,Loc2,A
002,alice,20170401,Loc1,A
003,jerry,20170401,Loc3,A
004,kate,20170401,Loc4,A
004,kate,20170403,Loc5,A
B类位置数据(假设文件名为locB,存储在E盘下)
结构与数据如下:
id,name,starttime,endtime,location,type
001,tom,20170331,20170401,Loc1,A
001,tom,20170331,20170401,Loc2,A
001,tom,20170331,20170402,Loc2,A
002,alice,20170331,20170401,Loc1,A
003,jerry,20170331,20170401,Loc3,A
004,kate,20170331,20170401,Loc4,A
004,kate,20170331,20170403,Loc5,A
C类位置数据(假设文件名为locC,存储在E盘下)
结构与数据如下:
id,name,desc,starttime,endtime,location,type
005,ada,good,20170330,20170403,Loc1,A
001,tom,bad,20170330,20170403,Loc2,A
006,mark,very good,20170330,20170404,Loc2,A
002,alice,perfect,20170330,20170403,Loc1,A
007,joyce,aweful,20170330,20170403,Loc3,A
004,kate,none,20170330,20170403,Loc4,A
008,jim,interesting,20170330,20170403,Loc5,A
009,leo,interesting,20170331,20170402,Loc5,A
待筛选用户列表(文件名为users,存储在E盘下)
数据如下:
001
003
004
006
007
009
位置参数数据(名字为locRef,存储在E盘下)
location,longitude,lattitue
Loc1,116.2,39.3
Loc2,117.1,40.2
Loc3,116.5,39.8
Loc4,116.9,39.2
Loc5,116.7,39.2
第二步:读取文件
把spark下载下来,配置好环境变量,点开cmd或者PowerShell,输入spark-shell,进入命令行:
scala> val locA = sc.textFile("E:\\locA.txt")
locA: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[35] at textFile at <console>:30
scala> val locB = sc.textFile("E:\\locB.txt")
locB: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[37] at textFile at <console>:30
scala> val locC = sc.textFile("E:\\locC.txt")
locC: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[39] at textFile at <console>:30
scala> val users = sc.textFile("E:\\users.txt")
users: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[66] at textFile at <console>:30
scala> val locRef = sc.textFile("E:\\locRef.txt")
locRef: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at textFile at <console>:30
此处,sc代表了SparkContext,这里初始化了所需要的各种环境。
textFile函数读取各种格式数据生成RDD,类型是org.apache.spark.rdd.RDD[String]。
第三步:做字段提取,生成RDD
scala> val locARDD = locA.map(_.split(",")).map(x=>(x(0),(x(2),x(3))))
locARDD: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[43] at map at <console>:32
scala> val locBRDD = locB.map(_.split(",")).map(x=>(x(0),(x(3),x(4))))
locBRDD: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[45] at map at <console>:32
scala> val userRDD = users.map(x=>(x,x))
userRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[68] at map at <console>:32
scala> val locCRDD = locC.map(_.split(",")).map(x=>(x(0),(x(4),x(5))))
locCRDD: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[47] at map at <console>:32
scala> val locRefRDD = locRef.map(_.split(",")).map(x=>(x(0),(x(1),x(2))))
locRefRDD: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[89] at map at <console>:32
这里
- 下划线_代表了每一行
- split函数把每一行按照指定的分隔符进行分割,本例子中分隔符是逗号
- split之后每一行分割成各个片段,每一个片段代表一列,编号从0开始
第四步:合并RDD
scala> val events = sc.union(locARDD,locBRDD,locCRDD)
events: org.apache.spark.rdd.RDD[(String, (String, String))] = UnionRDD[48] at union at <console>:42
查看数量
scala> events.count
res26: Long = 22
可以看出记录数为22,是三个RDD的集合。
我们在RDD的transformation函数介绍的时候详细介绍过union,union是一种RDD拼接的函数,不去重,所以不用shuffle,用法很简单,有几个rdd就用逗号分开即可。
第五步:过滤某些字段
假设把时间大于20170403或小于等于20170401的过滤掉,我们合并后的RDD结构为
id,time,location,所以把第二个字段做为过滤条件
scala> val filteredRDD = events.filter(x=>(x._2._1.toInt > 20170401 && x._2._1.toInt <= 20170403))
filteredRDD: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[64] at filter at <console>:44
过滤后的记录数目为:
scala> filteredRDD.count
res40: Long = 12
第六步:关联用户
scala> events.join(userRDD)
res43: org.apache.spark.rdd.RDD[(String, ((String, String), String))] = MapPartitionsRDD[71] at join at <console>:49
直接关联的时候我们发现,变成了一个(key,value)的pair,key为String,value是(value1,value2),其中value1是(colum1,coulm2)的一个tuple。
key就是id,用._1来取这个值
value用._2来取
value1是(time,location)组成的元组,用._2._1来取
value2是id,使用._2._2来取
此处我们只需要value1,我们试着打印一下看看
scala> events.join(userRDD).map(x=>x._1+"|"+x._2._1._1+"|"+x._2._1._2).foreach(println)
003|20170401|Loc3
007|20170403|Loc3
003|20170401|Loc3
004|20170401|Loc4
006|20170404|Loc2
004|20170403|Loc5
009|20170402|Loc5
001|20170401|Loc1
004|20170401|Loc4
001|20170402|Loc1
004|20170403|Loc5
001|20170402|Loc2
004|20170403|Loc4
001|20170401|Loc1
001|20170401|Loc2
001|20170402|Loc2
001|20170403|Loc2
由于我们来需要进一步关联,所以此处要把x._2._1._2做为key再重新生成一下RDD
scala> val filteredEvents = events.join(userRDD).map(x=>(x._2._1._2,(x._1,x._2._1._1)))
filteredEvents: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[87] at map at <console>:48
取两条数据看看
scala> filteredEvents.take(2)
res47: Array[(String, (String, String))] = Array((Loc2,(006,20170404)), (Loc3,(007,20170403)))
key变成了location,value是(id,time)组成的一个pair。
第七步:关联位置参数
此处我们使用leftOuterJoin试试
关联字段为location,所以两个RDD的key都必须是location
scala> filteredEvents.leftOuterJoin(locRefRDD)
res48: org.apache.spark.rdd.RDD[(String, ((String, String), Option[(String, String)]))] = MapPartitionsRDD[92] at leftOuterJoin at <console>:53
我们可以看出,value2,也就是(longtitue,lattitue)组成的pair变成了Option类型对象,这个就是我们在介绍leftOuterJoin的时候讨论的Some和None。
第八步:选取字段,生成新的结果
现在我们取id,time,location,longitue,lattitude字段组成新的结果集
scala> filteredEvents.leftOuterJoin(locRefRDD).map(x=>x._1+"|"+x._2._1._1+"|"+x._2._1._2+"|"+x._2._2.get._1+"|"+x._2._2.get._2).foreach(println)
Loc4|004|20170401|116.9|39.2
Loc5|009|20170402|116.7|39.2
Loc2|006|20170404|117.1|40.2
Loc3|007|20170403|116.5|39.8
Loc1|001|20170401|116.2|39.3
Loc3|003|20170401|116.5|39.8
Loc2|001|20170402|117.1|40.2
Loc5|004|20170403|116.7|39.2
Loc4|004|20170401|116.9|39.2
Loc5|004|20170403|116.7|39.2
Loc2|001|20170401|117.1|40.2
Loc3|003|20170401|116.5|39.8
Loc1|001|20170402|116.2|39.3
Loc2|001|20170402|117.1|40.2
Loc4|004|20170403|116.9|39.2
Loc2|001|20170403|117.1|40.2
Loc1|001|20170401|116.2|39.3
foreach用来打印每条记录
第九步,存储成指定文件数目
scala> filteredEvents.leftOuterJoin(locRefRDD).map(x=>x._1+"|"+x._2._1._1+"|"+x._2._1._2+"|"+x._2._2.get._1+"|"+x._2._2.get._2).coalesce(1)
res88: org.apache.spark.rdd.RDD[String] = CoalescedRDD[179] at coalesce at <console>:55
scala> filteredEvents.leftOuterJoin(locRefRDD).map(x=>x._1+"|"+x._2._1._1+"|"+x._2._1._2+"|"+x._2._2.get._1+"|"+x._2._2.get._2).coalesce(1,true)
res89: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[187] at coalesce at <console>:55
scala> filteredEvents.leftOuterJoin(locRefRDD).map(x=>x._1+"|"+x._2._1._1+"|"+x._2._1._2+"|"+x._2._2.get._1+"|"+x._2._2.get._2).repartition(1)
res90: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[195] at repartition at <console>:55
我们可以看出coalesce(1,true)和repartition(1)的结果其实是一模一样的。
第十步,保存到指定位置
scala> filteredEvents.leftOuterJoin(locRefRDD).map(x=>x._1+"|"+x._2._1._1+"|"+x._2._1._2+"|"+x._2._2.get._1+"|"+x._2._2.get._2).saveAsTextFile("file:\\E:\\result\\")
这样的话会有多个文件
scala> filteredEvents.leftOuterJoin(locRefRDD).map(x=>x._1+"|"+x._2._1._1+"|"+x._2._1._2+"|"+x._2._2.get._1+"|"+x._2._2.get._2).coalesce(1).saveAsTextFile("file:\\E:\\result\\")
最后结果只有一个结果文件
最后生成目录包含了如下文件
._SUCCESS.crc
.part-00000.crc
_SUCCESS
part-00000
到这里,一个简单的例子就完成了。
本篇涉及到的RDD函数有
textFile
split
map
union
filter
join
leftOuterJoin
foreach
coalesce
repartition
saveAsTextFile