Spark计算模型
Spark程序模型
一个经典的示例模型
-
SparkContext
中的textFile
函数从HDFS读取日志文件,输出变量file
var file = sc.textFile("hdfs://***")
-
RDD中的
filter
函数过滤带有'ERROR'的行,输出errors
(一个RDD)var errors = file.filter(line=>line.contains("ERROR"))
RDD中的
count
函数返回"ERROR"的行数:errors.count()
小结
- 用户程序对RDD通过多个函数进行操作,将RDD进行转换
- BlockManager管理RDD的物理分区,每个Block节点上对应一个数据块,可以存储在内存中或者磁盘
- RDD中的
partition
是一个逻辑数据块,对应相应的物理块Block - 一个RDD在代码中相当于是数据的一个元数据结构,存储着数据分区及其逻辑结构映射关系,存储着RDD之前的依赖转换关系
弹性分布式数据集(resilient distributed dataset,RDD)
RDD简介
- 它是一个分布式数据架构,是逻辑集中的实体,在集群中的多台机器上进行了数据分区
- RDD是Spark的核心数据结构,通过RDD的依赖关系形成了Spark的调度顺序
- 通过对RDD的操作形成整个Spark程序
RDD的两种创建方式
- 从Hadoop文件系统(或其他与Hadoop兼容的持久化存储系统,如Hive,Cassandra,Hbase...)输入创建
- 从父RDD转换得到新的RDD
RDD的两种操作算子
RDD有两种计算操作算子:Transformation(转换)和Action(行动)
Transformation(转换)
Transformation是延迟计算的,也就是说从一个RDD转换成另一个RDD的转换操作不是马上执行的,需要等到有Actions操作时,才真正触发
Action(行动)
Action算子会触发Spark提交作业,并将数据输出到Spark系统
RDD的重要内部属性
- 分区列表
- 计算每个分片的函数
- 对父RDD的依赖列表
- 对Key-Value对数据类型RDD的分区器,控制分区策略和分区数
- 每个数据分区的地址列表(如HDFS上的数据块的地址)
RDD与DSM的异同
DSM
- DSM(distributed shared memory, 分布式共享内存).
- 在DSM系统中,应用可以向全局地址空间的任意位置进行读写操作
- DSM是一种通用的内存数据抽象
RDD与DSM的区别
- RDD不仅可以通过批量转换创建RDD,还可以对任意内存位置读写
- RDD限制应用执行批量写操作,这样有利于实现有效的容错
Spark的数据存储
- 核心是RDD
- RDD可被抽象为一个大的数组,但是这个数组是分布在集群上的
- 逻辑上RDD的每个分区叫一个Partition
- RDD对象实质上是一个元数据结构,存储着Block,Node等的映射关系
- 一个RDD就是一组分区,在物理数据存储上,RDD的每个分区对应的就是一个Block,Block可以存储在内存,也可以缓存在磁盘上
- 每个Block中存储着RDD所有数据项的一个子集,暴露给用户的可以是一个Block的迭代器也可以是一个数据项
- HDFS中的一个分区对应Spark的一个分区
- 支持Hash分区和Range分区
算子的分类及功能
算子的分类
-
Value数据类型的Transformation算子
这种变换并不触发提交作业,只对处理的数据项是Value型的数据
-
Key-Value性Transformation算子
这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据对
Action算子,这类算子触发SparkContext提交Job作业
Value型Transformation算子
根据RDD变换算子的输入分区和输出分区关系分为以下几种类型.
输入分区和输出分区一对一型
map
将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素.源码中的map算子相当于初始化一个RDD,新RDD叫做MappedRDD(this, sc.clean(f))
flatMap
将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的RDD的每个集合中的元素合并为一个集合.内部创建FlatMappedRDD(this, sc.clean(f))
mapPartitions
mapPartitions
函数获取到每个分区的迭代器,在函数中通过这个分区整体的迭代器对整个分区的元素进行操作.内部实现是生成MapPartitionsRDD
.
glom
glom
函数将每个分区形成一个数组,内部实现是返回的GlommeRDD
.
输入分区和输出分区多对一型
union
使用union
函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同,并不进行去重操作,保存所有元素.
如果想去重可以适用distinct()
.++
符号相当于union
函数操作
cartesian
对两个RDD内的所有元素进行笛卡尔积操作.操作后,内部实现返回`CartesianRDD.
输入分区与输出分区多对多型
groupBy
将元素通过函数生成相应的Key,数据就转化为Key-Value格式,之后将Key相同的元素分为一组.
函数实现如下:
-
sc.clean()
函数将用户函数预处理:var cleanF = sc.clean(f)
-
对数据map进行函数操作,最后再对groupByKey进行分组操作
this.map(t=>(cleanF(t), t)).groupByKey(p)
其中,p中确定了分区个数和分区函数,也就决定了并行化的程度.
输出分区为输入分区子集型
filter
filter
的功能是对元素进行过滤,对每个元素应用f函数,返回值为true的元素在RDD中保留,返回false的将过滤掉.内部实现相当于生成FilteredRDD(this, sc.clean(f))
.
下面代码是函数的本质实现:
def filter(f:T=>Boolean):RDD[T]=new FilteredRDD(this,sc.clean(f))
distinct
distinct
将RDD中的元素进行去重操作.
subtract
subtract
相当于进行集合的差操作.RDD1去除RDD1和RDD2交集中的所有元素.
sample
sample
将RDD这个集合内的元素进行采样,获取所有元素的子集.用户可以设定是否有放回的抽样,百分比,随机种子,进而决定采样方式.
内部实现是生成SampledRDD(withReplacement, fraction, seed)
- withReplacement:是否有放回的抽样.=true为有放回抽样
- fraction:采样百分比,0-1
- seed:随机种子
takeSample
takeSample()
函数和上面的sample
函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行Collect(),返回结果的集合为单机的数组.
Cache型
cache
cache
将RDD元素从磁盘缓存到内存,相当于persist(MEMORY_ONLY
函数的功能.
persist
persist
函数对RDD进行缓存操作,数据缓存在哪里由StorageLevel
枚举类型确定.有以下几种类型的组合,DISK代表磁盘,MEMORY代表内存,SER代表数据是否进行序列化存储.
下面为函数定义,StorageLevel是枚举类型,代表存储模式:
persist(newLevel: StorageLevel)
Key-Value型Transformation算子
Transformation处理的数据为Key-Value形式的算子,大致分为一下3中类型.
输入分区与输出分区一对一
mapValues
针对(Key, Value)型数据中的Value进行Map操作,而不对Key进行处理
聚集
对单个RDD聚集
combineByKey
定义combineByKey算子的代码如下:
combineByKey[C](createCombiner:(V)=>C,
mergeValue:(C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner
mapSideCombine: boolean = true,
serializer: Serializer = null): RDD[(K, C)]
说明
- createCombiner:V => C, 在C不存在的情况下,如通过V创建seq C.
- mergeValue: (C, V) => C, 在C已经存在的情况下,需要merge,如把item V加到seq C中,或者叠加
- mergeCombiners: (C, C) => C, 合并两个C
- partitioner: Partitioner(分区器),Shuffle时需要通过Partitioner的分区策略进行分区
- mapSideCombine: Boolean=true, 为了减小传输量,很多combine可以在map端先做.
- serializerClass: String=null, 传输需要序列化,用户可以自定义序列化类
例如相当于把(Int, Int)的RDD变成了(Int, Seq[Int])类型元素的RDD
reduceByKey
多个值合并成一个值(将相同Key的数据的value相加).
函数实现如下.
def reduceByKey(partitioner: Partitioner, func: (V, V)=>V): RDD[(K, V)] = {
combineByKey[V]((v: V)=>v, func, func, partitioner)
}
partitionBy
partitionBy
函数对RDD进行分区操作.
函数定义如下.
partitionBy(partitioner: Partitioner)
如果原有RDD的分区器和现有分区器一致,则不重新分区,否则相当于根据分区器生成一个新的ShuffledRDD.
对两个RDD进行聚集
cogroup
cogroup
函数将两个RDD进行协同划分,cogroup
函数的定义如下.
cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
对于两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器(K, (Itetable[V], Iterable[W])
.其中Key和Value,Value是两个RDD想相同Key的两个数据集合的迭代器所构成的元组.
连接
join
join
对两个需要连接的RDD进行cogroup
函数操作,cogroup
操作之后形成的新RDD,对每个Key下的元素进行笛卡尔积操作,返回的结果再展平,对应Key下的所有元组形成一个集合,最后返回RDD[(K, (V, W))]
下面是join
的函数实现,本质是通过cogroup
算子先进行协同划分,再通过flatMapValues
将合并的数据打散.
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
for (v <- vs; w <- ws) yield (v, w) }
leftOutJoin和rightOutJoin
leftOutJoin
和rightOutJoin
相当于在join
基础上先判断一侧的RDD元素是否为空,如果为空,则填充为空.如果不为空,则将数据进行连接运算.
下面代码是leftOutJoin
的实现.
if (ws.isEmpty) {
vs.map(v => (v, None))
} else {
for (v <- vs; w <- ws) yield (v, Some(w))
}
Actions算子
本质上在Actions算子中通过SparkContext执行提交作业的runJob操作,出发了RDD DAG的执行.
下面根据Action算子的输出空间将Action算子进行分类.
无输出
foreach
对RDD中的每个元素对应用f函数操作,不返回RDD和Array,而是返回Uint
HDFS
saveAsTextFile
函数将数据输出,存储到HDFS的指定目录.
下面为函数的内部实现.
this.map(x => (NullWritable.get(), new Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
将RDD中的每个元素映射变为(Null, x.toString),然后再将其写入HDFS.
saveAsObjectFile
saveAsObjectFile
函数将分区中的每10个元素组成一个Array,然后将这个Array序列化映射为(Null, BytesWritable(Y))
的元素,写入HDFS为SequenceFile的格式.
下面是代码实现.
map(x=>(NullWirtable.get(), new BytesWritable(Utils.serialize(x))))
Scala集合和数据类型
collect
collect
相当于toArray
,将分布式的RDD返回为一个单机的scala Array数组.在这个数组上运用scala的函数式操作.
collectAsMap
collectAsMap
函数对(K, V)型的RDD数据返回一个单机HashMap.对于重复K的RDD元素,后面的元素覆盖前面的元素 .
reduceByKeyLocally
实现的是先reduce再collectAsMap的功能,相对RDD的整体进行reduce操作,然后再收集所有结果返回为一个HashMap.
lookup
下面代码是lookup
的声明.
lookup(key: K): Seq[V]
Lookup函数对(Key, Value)型的RDD操作,返回指定Key对应的元素形成的Seq.这个函数处理优化的部分在于,如果这个RDD包含分区器,则只会对应处理K所在的分区,然后返回由(K, V)形成的Seq.如果RDD不包含分区器,则需要对全RDD元素进行暴力扫描处理,搜索指定K对应的元素.
count
count
返回真个RDD的元素个数.内部函数实现如下.
def count(): Long=sc.runJob(this, Utils.getIteratorSize_).sum
top
top
可返回最大的K个元素.函数定义如下.
top(num: Int)(implicit ord: Ordering[T]): Array[T]
相近函数说明
函数 | 说明 |
---|---|
top | 返回最大的k个元素 |
take | 返回最小的K个元素 |
takeOrdered | 返回最小的K个元素,并且在返回的数组中保持元素的顺序 |
first | 相当于top(1)返回整个RDD中的前k个元素,可以定义排序的方式Ordering[T].返回的是一个数组 |
reduce
reduce
函数相当于对RDD中的元素进行reduceLeft
函数的操作.函数定义如下.
Some(iter.reduceLeft(cleanF))
reduceLeft
先对两个元素<K,V>进行reduce
函数操作,然后将结果和迭代器取出的下一个元素<K, V>进行reduce
函数操作,直到迭代器遍历完所有元素,得到最后结果.
在RDD中先对每个分区中的所有元素<K, V>的集合分别进行reduceLeft
.每个分区形成的结果相当于一个元素<K, V>,再对这个结果集合进行reduceLeft
操作.
fold
fold
和reduce
的原理相同,但是于reduce
不同,相当于每个reduce
时,迭代器取的第一个元素是zeroValue
aggregate
aggregate
先对每个分区的所有元素进行aggregate
操作,再对分区的结果进行fold
操作.
aggreagate
与fold
和reduce
的不同之处在于,aggreagate
相当于采用归并的方式进行数据聚集,这种聚集是并行化的.而在fold
和reduce
函数的运算过程中,每个分区中需要进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结果.
函数定义如下.
aggregate[B](z: B)(seqop: (B, A)=>B, combop: (B, B) => B): B
Spark计算模型的更多相关文章
-
Spark计算模型RDD
RDD弹性分布式数据集 RDD概述 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行 ...
-
Spark计算模型-RDD介绍
在Spark集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed DataSet,RDD),它是逻辑集中的实体,在集群中的多台集群上进行数据分区.通 ...
-
Spark(六) -- Spark计算模型
整个Spark框架都是基于RDD算子来进行计算的. What is RDD? Resilient Distributed Dataset(RDD),分布式弹性数据集,是Spark上的一个核心抽象 表示 ...
-
重要 | Spark和MapReduce的对比,不仅仅是计算模型?
[前言:笔者将分上下篇文章进行阐述Spark和MapReduce的对比,首篇侧重于"宏观"上的对比,更多的是笔者总结的针对"相对于MapReduce我们为什么选择Spar ...
-
【CDN+】 Spark入门---Handoop 中的MapReduce计算模型
前言 项目中运用了Spark进行Kafka集群下面的数据消费,本文作为一个Spark入门文章/笔记,介绍下Spark基本概念以及MapReduce模型 Spark的基本概念: 官网: http://s ...
-
Spark入门实战系列--3.Spark编程模型(上)--编程模型及SparkShell实战
[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 .Spark编程模型 1.1 术语定义 l应用程序(Application): 基于Spar ...
-
Spark中文指南(入门篇)-Spark编程模型(一)
前言 本章将对Spark做一个简单的介绍,更多教程请参考:Spark教程 本章知识点概括 Apache Spark简介 Spark的四种运行模式 Spark基于Standlone的运行流程 Spark ...
-
Spark:Spark 编程模型及快速入门
http://blog.csdn.net/pipisorry/article/details/52366356 Spark编程模型 SparkContext类和SparkConf类 代码中初始化 我们 ...
-
【Spark深入学习 -13】Spark计算引擎剖析
----本节内容------- 1.遗留问题解答 2.Spark核心概念 2.1 RDD及RDD操作 2.2 Transformation和Action 2.3 Spark程序架构 2.4 Spark ...
随机推荐
-
CPrimerPlus第十一章中的“选择排序算法”学习
C Primer Plus第十一章字符串排序程序11.25中,涉及到“选择排序算法”,这也是找工作笔试或面试可能会遇到的题目,下面谈谈自己的理解. 举个例子:对数组num[5]={3,5,2,1,4} ...
-
Oracle【IT实验室】数据库备份与恢复之三:OS备份/用户管理的备份与恢复
用户管理的备份与恢复也称 OS物理备份,是指通过数据库命令设置数据库为备份 状态,然后用操作系统命令,拷贝需要备份或恢复的文件.这种备份与恢复需要用户的 参与手工或自动完成. 对于使用 OS拷贝备份的 ...
-
【读书笔记】iOS-GCD-Dispatch Queue
一,Dispatch Queue的实现: 1,用于管理追加的Block的C语言层实现的FIFO队列. 2,Atomic函数中实现的用于排他控制的轻量级信号. 3,用于管理线程的C语言层实现的一些容器. ...
-
c标签 总结
JSTL标签用法 关键字:JSTL标签.<c:choose>.<c:forEach>.<c:forTokens>.<c:if>.<c:import ...
-
我使用过的Linux命令之date - 显示、修改系统日期时间
原文地址:http://www.cnblogs.com/diyunpeng/archive/2011/11/20/2256538.html 用途说明 ate命令可以用来显示和修改系统日期时间,注意不是 ...
-
Problem H
Problem Description 穿过幽谷意味着离大魔王lemon已经无限接近了! 可谁能想到,yifenfei在斩杀了一些虾兵蟹将后,却再次面临命运大迷宫的考验,这是魔王lemon设下的又一个 ...
-
python接口自动化测试(七)unittest 生成测试报告
用例的管理问题解决了后,接下来要考虑的就是报告我问题了,这里生成测试报告主要用到 HTMLTestRunner.py 这个模块,下面简单介绍一下如何使用: 一.下载HTMLTestRunner下载: ...
-
node.js安装全攻略
node.js学习也有近一年之久了,自开始以来,能遇到各种问题,就拿安装来说,总能遇到稀奇古怪的问题,但每次谷歌,或者百度,都能找到解决办法,逐渐也就养成了百度或者谷歌的毛病! 正常安装 第一步:下载 ...
-
详解C#泛型(一)
一.C#中的泛型引入了类型参数的概念,类似于C++中的模板,类型参数可以使类型或方法中的一个或多个类型的指定推迟到实例化或调用时,使用泛型可以更大程度的重用代码.保护类型安全性并提高性能:可以创建自定 ...
-
LOL TGP更新影响VS debug 问题
刚才看群里说到VS无法调试,出现"无法使用xxx附加到应用程序'webdev.webserver...'"的问题,群友提出自己的经历,可能是LOL TGP的问题. 提问者卸载了TG ...