Spark中的RDD是一个不可变的分布式对象集合,每个RDD都会被分为多个分区,这些分区运行在集群中的不同节点上。
创建RDD两种办法:
常用(读取外部数据集) :
testFile
把程序中一个已有的集合传给parallelize,不常用,占内存:
sc.parallelize(List(“a”,”c”))
RDD的持久化也称为缓存(persist):
SparkRDD是惰性求值的,而有时我们希望多次使用同一个RDD,如果简单的对RDD调用行动操作,Spark会重算RDD以及它的所有依赖。这样的消耗格外的大。
例如下面的例子:
我使用了2次Action算子(分别是计算数量。将数据输出成字符串的格式并且以逗号进行分割)这样等于是计算了2次RDD,消耗大。
val result = input.map(x=>x*x) println(result.count()) println(result.collect().makString(,))
为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。为了避免一个有持久化节点数据的设备发生故障,我们可以将数据备份到多个节点上(存储级别的末尾加上_2)
持久化存储的级别:
MEMORY_ONLY |
将数据缓存到内存中。计算超快 |
MEMORY_ONLY_SER |
将数据序列化后保存到内存中,可减少内存占用率。总体也是很快的 |
MEMORY_AND_DISK |
内存占不下,则溢写到硬盘中。计算快 |
MEMORY_AND_DISK_SER |
在内存中存放序列化后的数据,内存占不下,则溢写到硬盘中。计算较快 |
存储级别末尾_2备份两份 |
|
因此可以将上面的代码改成:
Import org.apache.spark.storage.StorageLevel Val result = input.map(x=>x*x) Result.persist(StorageLevel.MEMORY_ONLY_2) Println(result.count()) Println(result.collect().makString(,)
这里是在Action算子之前对数据进行了持久化,此时当我再次调用Action算子的时候就不会重复计算RDD了
手动把持久化的RDD从缓存中移除
unpersist