一.RDD基础
RDD(Resilient Distributed Dataset):弹性分布式数据集
RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中的不同节点。用户可以通过2中方法创建RDD:
1)读取一个外部数据集
sc.textFile("test.txt")
2)在驱动器程序里分发驱动器程序中的对象集合(比如list和set)
sc.parallelize(["pandas", "i like pandas"])
RDD支持两种类型的操作:
1)转化操作:由一个RDD生成一个新的RDD
val inputRDD = sc.textFile("log.txt")
val errorRDD = inputRDD.filter(line => line.contains("error"))
2)行动操作:对RDD计算出一个结果,并把结果返回到驱动器程序或把结果存储到外部存储系统
inputRDD.first()转化操作和行动操作区别在于Spark只会惰性的进行转化操作,它们只有第一次在一个行动操作中用到时,才会真正计算。比如上面例子中textFile()和filter()方法实际并没有真正执行,只有当执行first()是,textFile()和filter()方法才会真正执行。这也可以很好理解,如果log.txt文件非常大,那在执行textFile()方法是就会消耗很多存储空间,而在后面filter()就要筛选很多数据。
二.常见的转化操作和行动操作
1.转化操作
以rdd ={1,2,3,4}的一组数据为例
1)map():将函数应用于RDD中的每个元素,并返回值构成新的RDD。
rdd.map(x => x + 1)
返回结果:{2,3,4,5}
2)flatMap():将函数应用与RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词。
rdd.flatMap(x => x.to(3))返回结果:{1,2,3,2,3,3,3}
3)filter():返回一个由通过传给filter()的函数的元素组成的RDD
rdd.filter(x => x != 1)返回结果:{2,3,3}
4)distinct():去重
rdd.distinct()返回结果:{1,2,3}
以rdd2 ={1,2,3}和rdd3={3,4,5}的两组数据为例
5)union():生成一个包含两个RDD中的所有元素的RDD
rdd1.union(rdd2)返回结果:{1,2,3,4,5,6}
6)intersection():求两个RDD共同的元素的RDD
rdd1.intersection(rdd2)返回结果:{3}
7)substract():移除一个RDD中的内容
rdd1.substract(rdd2)返回结果:{1,2}
8)cartesian():与另一个RDD的笛卡尔积
rdd1.cartesian(rdd2)返回结果:{(1,3),(1,4),....(3,5)}