我们在运用Spark的时候,基本都是在于RDD打交道,实际上处理数据都是通过RDD提供的接口来操作。然而我们底层的数据到底是如何管理的呢?这正是今天我要学习的内容。
RDD是有不同分区组成,我们的操作都是基于不同的分区来完成,而在存储管理角度来讲,RDD是以数据块为单位的。本质上其实是等价的。
存储模块中的数据块形式有:RDD数据块; shuffle数据块;广播变量数据块;任务返回结果数据块(结果一般都是随任务通过akka一起返回Driver,但是结果数据太大就会导致将结果先以Block的形式存放在存储模块中)。
1,存储模块框架:
架构上讲,分为通信层和存储层。
1)通信层面采用主从方式实现通信(主从节点间互换消息);
2)存储层负责提供接口来存储数据(可把数据存储到内存,磁盘,或者远端);
存储模块的两大功能:
1)实现RDD形式的缓存(基于内存/磁盘);
2)实现shuffle数据持久化;
2,存储模块与外界通信的接口:
BlockManager类。
1)BlockManager类通过BlockManagerMaster进行通信;
2)主节点的BlockManager会包含所有从节点的BlockManager信息;
3)主从节点之间通过各自的BlockManagerMasterActor来进行相互通信;
3,通信层的消息传递:
由上述可知,主从节点之间传递消息用BlockManagerMasterActor,那么所传递的消息有哪些呢?
通过读源码可知都有如下种类消息:
RegisterBlockManager(是Excutor创建BlockManager后,再向Driver发送请求进行注册);
UpdateBlockInfo(用来更新数据块中的元信息,每个Excutor的BlockManager只会管理其自身Excutor下的元信息);
GetPeers(用来得到其他BlockManager的ID--------------->获取他人BlockManager的ID。既然是获取他人信息,我们回想一下,谁拥有其他所有 人的信息呢?当然是Driver上的BlockManager了,所以GetPeer的完成是通过Excutor向Driver发送消息来完成的);
GetLocations(用来获得数据块自身所在的BlockManager对应的ID--------------->获取自己BlockManager的ID);
GetLocationsMultipleBlockIDs(用来获取一组数据所在的BlockManager对应的ID--------------->获取一组BlockManager的ID)
RemoveExcutor(虽然名字是“删除Excutor”,实际上是删除死亡的Excutor下面的BlockManager)
BlockManagerSlaveActor间互相通信函数如下
RemoveRdd
RemoveBlock
下面我们再来详细过一下RDD分区和数据块的关系:
前面说到,RDD是基于分区来计算的,而在存储管理中,存储是以block为单位的。实际上是RDD这个逻辑概念是和Block物理存储概念是一一对应的。他两是通过映射关系联系到一起的。
具体映射关系:block名=“rddID+分区索引号”。
四,rdd缓存机制
1,RDD默认以基于内存缓存的方式将RDD缓存。
当数据超过缓存阈值时:Spark会丢弃一部分内存中的数据或者将一部分数据从内存移出到磁盘中,具体情况依据RDD的持久化选项。
如果是直接丢弃数据的话,程序会否报错呢?
答案是不一定的。如果被删除的数据的祖先是可被回溯到的,那么可以通过重新计算得到丢失的数据;
相反,程序会报错哦。
2,磁盘缓存。
磁盘缓存机制:
MEMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
DISK_ONLY
MEMORY_ONLY_2
MEMORY_AND_DISK_2
我们可以通过配置spark.local.dir来配置磁盘缓存目录
存储过程中,首选MEMORY_ONLY;其次选MEMORY_ONLY_SER;
如果数据量大且重新计算的开销大,那就用MEMORY_AND_DISK;
如果要确保快速的恢复机制,那就选MEMORY_ONLY_2,MEMORY_AND_DISK_2(因为有备份)
5,Shuffle数据的持久化:
shuffle数据必须是在磁盘上进行缓存,不能选择在内存中缓存;
RDD在磁盘持久化中,一个block对应一个文件,而shuffle数据块只是逻辑上的概念,存储方式因实现方式不同而不同
(1)默认将shuffle数据块映射成文件
(2)另外一种方式是将shuffle数据块映射成文件中的一段(实现方式是将spark.shuffle.consolidateFile设置为TRUE)
6,广播变量的存储:
广播变量存在的意义:实现数据在每个节点上都有一份拷贝;
Broadcast数据块是以MEMORY_AND_DISK的持久化方式存储的;
设置过期清理机制可以清理过期的广播变量
存储管理模块承担着数据管理,数据间通信,持久化等工作,存储管理模块的好坏对spark性能的影响至关重要!