RDD弹性分布式数据集的基本操作

时间:2023-01-13 06:05:29
RDD的中文解释是弹性分布式数据集。
构造的数据集的时候用的是List(链表)或者Array数组类型
/* 使用makeRDD创建RDD */ /* List */ val rdd01 = sc.makeRDD(List(1,2,3,4,5,6)) val r01 = rdd01.map { x => x * x } println(r01.collect().mkString(",")) /* Array */ val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6)) val r02 = rdd02.filter { x => x < 5} println(r02.collect().mkString(",")) val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1) val r03 = rdd03.map { x => x + 1 } println(r03.collect().mkString(",")) /* Array */ val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1) val r04 = rdd04.filter { x => x > 3 } println(r04.collect().mkString(","))

也可以直接用文件系统来构造

1 val rdd:RDD[String] = sc.textFile("file:///D:/sparkdata.txt", 1)
2 val r:RDD[String] = rdd.flatMap { x => x.split(",") }
3 println(r.collect().mkString(","))

RDD的操作分为转化操作(transformation)和行为操作(action),

转化操作和行为操作的本质区别

转化操作使一个RDD转化为另一个RDD而行动操作就是进行实际的计算

 1 val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
 2 val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
 3 val rddFile:RDD[String] = sc.textFile(path, 1)
 4  
 5 val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
 6 val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))
 7  
 8 /* map操作 */参数是函数,函数应用于RDD每一个元素,返回值是新的RDD
 9 println("======map操作======")
10 println(rddInt.map(x => x + 1).collect().mkString(","))
11 println("======map操作======")
12 /* filter操作 */参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD
13 println("======filter操作======")
14 println(rddInt.filter(x => x > 4).collect().mkString(","))
15 println("======filter操作======")
16 /* flatMap操作 */参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD
17 println("======flatMap操作======")
18 println(rddFile.flatMap { x => x.split(",") }.first())
19 println("======flatMap操作======")
20 /* distinct去重操作 */没有参数,将RDD里的元素进行去重操作方法转换操作生成一个只包含不同元素的一个新的RDD。开销很大。 
21 println("======distinct去重======")
22 println(rddInt.distinct().collect().mkString(","))
23 println(rddStr.distinct().collect().mkString(","))
24 println("======distinct去重======")
25 /* union操作 */会返回一个包含两个RDD中所有元素的RDD,包含重复数据。
26 println("======union操作======")
27 println(rdd01.union(rdd02).collect().mkString(","))
28 println("======union操作======")
29 /* intersection操作 */只返回两个RDD中都有的元素。可能会去掉所有的重复元素。通过网络混洗来发现共有元素
30 println("======intersection操作======")
31 println(rdd01.intersection(rdd02).collect().mkString(","))
32 println("======intersection操作======")
33 /* subtract操作 */返回只存在第一个RDD中而不存在第二个RDD中的所有的元素组成的RDD。也需要网络混洗
34 println("======subtract操作======")
35 println(rdd01.subtract(rdd02).collect().mkString(","))
36 println("======subtract操作======")
37 /* cartesian操作 */计算两个RDD的笛卡尔积,转化操作会返回所有可能的(a,b)对,其中a是源RDD中的元素,而b则来自于另一个RDD。 
38 println("======cartesian操作======")
39 println(rdd01.cartesian(rdd02).collect().mkString(","))
40 println("======cartesian操作======")

以下是行动操作代码

 1 val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
 2 val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
 3  
 4 /* count操作 */返回RDD所有元素的个数
 5 println("======count操作======")
 6 println(rddInt.count())
 7 println("======count操作======")  
 8 /* countByValue操作 */各元素在RDD中出现次数
 9 println("======countByValue操作======")
10 println(rddInt.countByValue())
11 println("======countByValue操作======")
12 /* reduce操作 */并行整合所有RDD数据,例如求和操作
13 println("======reduce操作======")
14 println(rddInt.reduce((x ,y) => x + y))
15 println("======reduce操作======")
16 /* fold操作 */和reduce功能一样,不过fold带有初始值
17 println("======fold操作======")
18 println(rddInt.fold(0)((x ,y) => x + y))
19 println("======fold操作======")
20 /* aggregate操作 */和reduce功能一样,不过fold带有初始值
21 println("======aggregate操作======")
22 val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
23 println(res._1 + "," + res._2)
24 println("======aggregate操作======")
25 /* foeach操作 */对RDD每个元素都是使用特定函数就是遍历
26 println("======foeach操作======")
27 println(rddStr.foreach { x => println(x) })
28 println("======foeach操作======")
.mapValues(x=>(x,1)).//mapValues是对值的操作,不操作key使数据变成(Tom,(26,1))

map()指的是对key进行操作

mapValues()指的是对Values进行操作

first()返回的是dataset中的第一个元素

take(n)返回前n个elements,这个是driverprogram返回的

takeSample(withReplacementnum,seed)抽样返回一个dataset中的num个元素,随机种子seed

saveAsTextFile(path)把dataset写到一个textfile中,或者HDFS支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中

saveAsTextFile(path)只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统

saveAsObjectFile(path)把dataset写到一个java序列化的文件中,用sparkContext,objectFile()装载

countByKey()返回的是key对应的个数的一个map.,作用与一个RDD

参考https://www.cnblogs.com/sharpxiajun/p/5506822.html加上自己的理解

transformation和action的主要区别


接口定义方式不同

1.Transformation:RDD[X]->RDD[Y]

2.Action:RDD[X]->Z(Z不是一个RDD,可能是基本类型,数组等)

执行方式也不同

Transformation只会记录RDD转化关系,并不会产生计算(惰性执行,LazyExecution)

Action是触发程序执行(分布式)的算子