spark-RDD(弹性分布式数据集)

时间:2021-12-18 06:09:39

Spark中的RDD是一个不可变的分布式对象集合,每个RDD都会被分为多个分区,这些分区运行在集群中的不同节点上。


创建RDD两种办法:

常用(读取外部数据集:

testFile

把程序中一个已有的集合传给parallelize,不常用,占内存:

sc.parallelize(List(“a”,”c”))

RDD的持久化也称为缓存(persist):

 

    SparkRDD是惰性求值的,而有时我们希望多次使用同一个RDD,如果简单的对RDD调用行动操作Spark会重算RDD以及它的所有依赖。这样的消耗格外的大。

例如下面的例子:

  我使用了2Action算子(分别是计算数量。将数据输出成字符串的格式并且以逗号进行分割)这样等于是计算了2RDD,消耗大。

val result = input.map(x=>x*x)

println(result.count())

println(result.collect().makString(,))


  为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。为了避免一个有持久化节点数据的设备发生故障,我们可以将数据备份到多个节点上(存储级别的末尾加上_2)

 

持久化存储的级别:

MEMORY_ONLY

将数据缓存到内存中。计算超快

MEMORY_ONLY_SER

将数据序列化后保存到内存中,可减少内存占用率。总体也是很快的

MEMORY_AND_DISK

内存占不下,则溢写到硬盘中。计算快

MEMORY_AND_DISK_SER

在内存中存放序列化后的数据,内存占不下,则溢写到硬盘中。计算较快

存储级别末尾_2备份两份

 

 因此可以将上面的代码改成:

Import org.apache.spark.storage.StorageLevel

Val result = input.map(x=>x*x)

Result.persist(StorageLevel.MEMORY_ONLY_2)

Println(result.count())

Println(result.collect().makString(,)

这里是在Action算子之前对数据进行了持久化,此时当我再次调用Action算子的时候就不会重复计算RDD

手动把持久化的RDD从缓存中移除

unpersist