本期内容:
1、RDD依赖关系的本质内幕
2、依赖关系下的数据流视图
3、经典的RDD依赖关系解析
4、RDD依赖关系源码内幕
1、RDD依赖关系的本质内幕
由于RDD是粗粒度的操作数据集,每个Transformation操作都会生成一个新的RDD,所以RDD之间就会形成类似流水线的前后依赖关系;在spark中,RDD之间存在两种类型的依赖关系:窄依赖(Narrow Dependency)和宽依赖(Wide Dependency 或者是 Narrow Dependency);如图1所示显示了RDD之间的依赖关系。
图1
从图1中可知:
窄依赖是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter、union等操作都会产生窄依赖;
宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;
需要特别说明的是对join操作有两种情况:如果两个RDD在进行join操作时,一个RDD的partition仅仅和另一个RDD中已知个数的Partition进行join,那么这种类型的join操作就是窄依赖,例如图1中左半部分的join操作(join with inputs co-partitioned);其它情况的join操作就是宽依赖,例如图1中右半部分的join操作(join with inputs not co-partitioned),由于是需要父RDD的所有partition进行join的转换,这就涉及到了shuffle,因此这种类型的join操作也是宽依赖。
总结:在这里我们是从父RDD的partition被使用的个数来定义窄依赖和宽依赖,因此可以用一句话概括下:如果父RDD的一个Partition被子RDD的一个Partition所使用就是窄依赖,否则的话就是宽依赖。因为是确定的partition数量的依赖关系,所以RDD之间的依赖关系就是窄依赖;由此我们可以得出一个推论:即窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖。
一对固定个数的窄依赖的理解:即子RDD的partition对父RDD依赖的Partition的数量不会随着RDD数据规模的改变而改变;换句话说,无论是有100T的数据量还是1P的数据量,在窄依赖中,子RDD所依赖的父RDD的partition的个数是确定的,而宽依赖是shuffle级别的,数据量越大,那么子RDD所依赖的父RDD的个数就越多,从而子RDD所依赖的父RDD的partition的个数也会变得越来越多。
2、依赖关系下的数据流视图
在spark中,会根据RDD之间的依赖关系将DAG图划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。
因此spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。因此在图2中RDD C,RDD D,RDD E,RDDF被构建在一个stage中,RDD A被构建在一个单独的Stage中,而RDD B和RDD G又被构建在同一个stage中。
在spark中,Task的类型分为2种:ShuffleMapTask和ResultTask;
简单来说,DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的!而其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中;也就是说图2中的stage1和stage2相当于mapreduce中的Mapper,而ResultTask所代表的stage3就相当于mapreduce中的reducer。
需要补充说明的是,在前面的课程中,我们实际动手操作了一个wordcount程序,因此可知,Hadoop中MapReduce操作中的Mapper和Reducer在spark中的基本等量算子是map和reduceByKey;不过区别在于:Hadoop中的MapReduce天生就是排序的;而reduceByKey只是根据Key进行reduce,但spark除了这两个算子还有其他的算子;因此从这个意义上来说,Spark比Hadoop的计算算子更为丰富。
3、Stage中任务执行的内幕思考
在一个stage内部,从表面上看是数据在不断流动,然后经过相应的算子处理后再流向下一个算子,但实质是算子在流动;我们可以从如下两个方面来理解:
(1) 数据不动代码动;这点从算法构建和逻辑上来说,是算子作用于数据上,而算子处理数据一般有多个步骤,所以这里说数据不动代码动;
(2) 在一个stage内部,算子之所以会流动(pipeline)首先是因为算子合并,也就是所谓的函数式编程在执行的时候最终进行函数的展开,从而把一个stage内部的多个算子合并成为一个大算子(其内部包含了当前stage中所有算子对数据的所有计算逻辑);其次是由于Transformation操作的Lazy特性。因为这些转换操作是Lazy的,所以才可以将这些算子合并;如果我们直接使用scala语言是不可以的,即使可以在算子前面加上一个Lazy关键字,但是它每次操作的时候都会产生中间结果。同时在具体算子交给集群的executor计算之前首先会通过Spark Framework(DAGScheduler)进行算子的优化(即基于数据本地性的pipeline)。
4、RDD依赖关系源码内幕
源码初探
在IDEA中打开源码,找到org.apache.spark.Dependency.scala这个类,首先我们可以看到如下的代码:
图3
在抽象类Dependency中,rdd就是子RDD所依赖的父RDD,同时所有的依赖都要实现Dependency[T],这点我们可以查看宽依赖和窄依赖的实现源代码。
Dependency源码
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ package org.apache.spark import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle /**
* :: DeveloperApi ::
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
} /**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the child RDD depends on a small number
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int] override def rdd: RDD[T] = _rdd
} /**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
* @param keyOrdering key ordering for RDD's shuffles
* @param aggregator map/reduce-side aggregator for RDD's shuffle
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
@transient _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] { override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]] val shuffleId: Int = _rdd.context.newShuffleId() val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
} /**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
} /**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
4.1窄依赖源代码分析:
接着我们可以看到NarrowDependency这个抽象类源码:
图4
其中getParents这个函数的作用是返回子RDD的partitioneId依赖的所有的父RDD的partitions;
我们在上面说过,窄依赖有两种情况:一种是一对一的依赖,另一种是一对确定个数的依赖,我们可以从源代码中找到这两种窄依赖的具体实现;第一种即为OneToOneDependency:
图5
从getParents的实现可知,子RDD仅仅依赖于父RDD相同ID的Partition;
那么第二种类型的窄依赖即为:RangeDependency,它只被org.apache.spark.rdd.UnionRDD所使用;我们可以在UnionRDD中看下相应的使用情况:
图6
UnionRDD是将多个RDD合并成一个RDD,这些RDD是被拼接起来的,即每个父RDD的partition的相对顺序不变,只是每个父RDD在UnionRDD中的Partition的起始位置不同,具体我们可以看看RangeDependency中getParents方法的实现:
图7
其中,inStart是父RDD中Partition的起始位置,outStart是在UnionRDD中的起始位置,length是父RDD中Partition的数量。
4.2宽依赖源代码分析
由于宽依赖的实现只有一种:ShuffleDependency;即父RDD的一个Partition被子RDD的多个partition所使用,我们主要关注以下两点:
图8
ShuffleId表示获取新的Id,下面的shuffleHandle表示向ShuffleManger注册Shuffle信息。
宽依赖支持两种类型的Shuffle Manager,即HashShuffleManager和SortShuffleManager。如图9所示:
图9
感谢下面的博主:
http://bbs.pinggu.org/thread-4637506-1-1.html
DT大数据梦工厂联系方式:
新浪微博:www.weibo.com/ilovepains/
微信公众号:DT_Spark
博客:http://.blog.sina.com.cn/ilovepains
TEL:18610086859
Email:18610086859@vip.126.com