Spark官方文档《Spark Programming Guide》的链接: http://spark.apache.org/docs/latest/programming-guide.html
本文原文出处: http://blog.csdn.net/bluishglc/article/details/50715879 严禁任何形式的转载,否则将委托CSDN官方维护权益!
RDD的操作类型: Transformation 和 Action
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program
transformation:把数据集从一种形式“转置”为另一种形式。
action: 向driver返回一个值。
两者的本质区别是:transformation发生在各个excutor上(类似于MR中的map阶段),action需要从excutor上收集汇总结果(类似于MR中的reduce阶段)
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently – for example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.
鉴于transformation和excutor的性质,我们不难设想:为了避免driver和executor之间不必要的通信,特别是不要的“中间数据集”的传递,transformation操作应该总是lazy的,也就是说spark不需要也不应该在遇到一个transformation操作时就立刻推送到executor上执行,而是直到遇到一个action时再统一一并运算,因为action是需要从executor返回数据的。这个很好举例,比如一段代码中第一个动作是一个map,执行结束之后每个executor上都是一个经过转置的新的dataset,如果第二个动作还是一个map或其他的transformation,则可以在每个executor上基于当前的这个dataset继续运算,没有返回任何结果给driver的必要。
Passing Functions to Spark
推荐的方式有两种:
- 匿名函数(lamdba表达式),即一段代码片段
- 全局单一的函数体:全局静态方法 or 函数对象, 此类情况暗含的意思是:函数本身不从属于一个特定的对象,无状态不会导致函数依附的对象(的状态)和对象的所有字段(的值)被传递到executor.
但实际上你还可以用第三种方式传递函数,即把函数定义为一个类的方法,以参数的形式把rdd传给这个方法,比如像这样:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
但是上述做法已经开始暴露出一些“隐患”, 因为func1是MyClass的一个成员方法, 在运行时刻,rdd.map(func1)实际上是rdd.map(this.func1),也就是说在计算时,当时的MyClass的实例会传递到所有的executor上,如果MyClass没有任何字段和外部引用的变量,则尚无明显问题,但当其包含字段时(有状态时),MyClass实例的所有字段的值以及字段依赖的值都需要传递到所有executor上,而其中绝大多数的值是不需要的,这会带来不必要的的数据传输,比如下面这段代码:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
我们也可以认为这是不恰当使用“闭包”带来的副作用,
Understanding Closures
对于这一部分,文档中给出的是一个绝佳的例子,透彻理解这个例子背后的原因就能明白Spark与闭包之间的微妙关系:
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
如果这是在本地(单节点)运行的代码,那么这就一段经典的演示“闭包”概念的代码。函数字面量counter += x是一个闭包,它有一个*变量counter, 当这个闭包传递给rdd参与运算的过程中,counter变量一直存在于伴随这个闭包的上下文中,其状态(值)会持续的被改变并保持,因此,在运算结束后,counter的值就是rdd中数据条目的总数。
但如果是在集群模式(多节点)下,情况就大不相同了,由于数据被分散到了多个节点上并行处理,计算逻辑,也就是我们的闭包也要分发到各个节点上去执行(Spark会完成闭包的序列化工作),OK,我们可以确定的是:当闭包分发到各个executor上时,闭包的中所有*变量的值都来自于driver发分发时的那个上下文里,现在闭包完全地独立出来了,就是完全在一个全新的环境中运行了,executor本地的自用变量与driver上的那些对应变量已经没有任何关系了,它们都将在各自的环境里*演变了,彼此之间没有任何联系了。简单地解释就是:闭包和它的上下文只在一个进程(节点)中有效。因此:在本例中,你不能指望在driver端执行这段代码后得到counter的值,实际上它只是在初值是0的时候被复制了n份传递给executor之外,它在driver端什么也没做,而executor端的那个counter副本只在exexutor本地计算着“局部”的counter, 它已和driver端的这个counter没有丝毫的关系,所以最后的结果是:程序执行结束后,打印的这个counter是driver端的这个coutner,它没有被计算,它的值一直是0.
另外一方面,如果counter是一个全局性(正对整个程序或着说集群)的只读的初值,那么作为闭包的*变量传播到整个集群这是有意义的,但如果executor企图要改写一个全局性的初值,则这样做是没有意义的。对于Spark来说,合适作为闭包的*变量应该是这样一种值:在局部数据集计算过程中需要持续追踪状态的变量!
To execute jobs, Spark breaks up the processing of RDD operations into tasks - each of which is operated on by an executor. Prior to execution, Spark computes the closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.
In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state.
Working With Key-Value Pairs
Spark大多数的操作是面向RDD的,而RDD可以包含任何类型的对象。也就是说,Spark大多数的操作是可以应用于任何类型的数据对象的。但是也存在一些特殊操作是面向特定的key-value pairs, 这些特殊操作绝大多数是和“shuffle”相关的,比如按一个key进行grouping和aggregation等。
比如如下的示意代码:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
第二行,经过lines.map操作之后得到的新RDD的元素是以文本每一行为key,以固定数字1为value的pair,而在这样一个RDD上使用reduceByKey时就变得有趣了,让我们先来看看对reduceByKey这个方法的解读:
reduceByKey(func, [numTasks]) :
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
也就是说:如果数据集是(K,V)对,key的部分会由Spark来自动处理,Spark会将所有key相同的数据集进行分组聚合(也就是分在一起),然后呢,使用用户提供的函数对它们的value进行处理,所以,reduceByKey接受的函数类型是(V,V) => V!所以上面例子中(a, b) => a + b,a和b指代的都是value,也就是数字1,而后续的动作就一个集合的reduce操作无异了。
Shuffle Operations
让我们接着前面的reduceByKey讲,这是一个很好是解释Spark Shuffle操作的例子,对于reduceByKey,它有一个显而易见的挑战:对于同一个Key的所有的value会分布于整个集群的不同节点上,所以Spark在计算时是需要执行一个All-To-All的操作,也就是在所有分区上找到所有Key对应的所有Value,针对第一个key,把它对应的所有的value从所有的分区上带回来进行计算, 这个过程就是shuffle.
shuffle是一个非常消耗资源的操作,为了实现shuffle,spark会进行非常类似于hadoop中的Map和Reduce操作,其中大量Map任务的结果会保存在内存中,所以对内存资源的占用是显而易见的,当内存不足以cache所有的map结果时,这些数据会被持久化到磁盘上,因此这又会带来本地磁盘的I/O操作,另外,shuffle会产生大量中间文件,对于长期运行的spark job会占用很大硬盘空间。
Shared Variables
通常来说, 跨任务(节点)的可读写的共享变量(或者说是scope是整个集群的全局变量)是很低效的,这很容易理解。尽管如此,Spark依然支持了两种形式的共享变量:broadcast variables 和 accumulators
Broadcast Variables
首先,broadcast variables是只读的,更具官方文档的讲述:
The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.
显式地创建broadcast variables只在这种情况下是有意义的:当任务跨域了多了stage,而在这些stage中需要使用同样的数据(对于这种案例暂时还没有找到清晰的例子)。
Accumulators
An accumulator is created from an initial value v by calling SparkContext.accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value.Only the driver program can read the accumulator’s value, using its value method.
Task只可以向一个Accumulator追加值,但是不能读它的值,只有driver程序才能读取它的值。
本质上accumulator是trait: AccumulableParam的一个子类,它只有最主要的一个操作
addInPlace,但这并不意味着accumulator只可以做简单的加法,这取决于你要怎样书写这个方法的逻辑。Spark内置支持Int类型的accumulator,用户可以继承AccumulableParam来实现自己的Accumulator.
最后,accumulator并不会影响Spark的Lazy机制,也 就是说,在一系列的transformation操作之后 ,accumulator的值未必会被更改,而是直到一个action执行之后,它的值在driver端才有可能更新,比如像下面这个例子:
val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// Here, accum is still 0 because no actions have caused the <code>map</code> to be computed.