1.Map系转换
Map转换算是 RDD 的经典转换操作之一了.就以它开头.Map的源码如下
def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }
1.1 sc.clean(f)
第一眼看见的就是一个 clean 函数.这是做什么的.来看看定义
/** * Clean a closure to make it ready to be serialized and sent to tasks * (removes unreferenced variables in $outer's, updates REPL variables) * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively * check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt> * if not. */ private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { ClosureCleaner.clean(f, checkSerializable) f }
说的非常清楚.clean 整理一个闭包,使其可以序列化并发送到任务.这里想明白.为什么会序列化闭包传送到任务.
因为Map的闭包执行逻辑本身是上传到Driver端的,但是真正的执行是交给executor的.所以才会有序列化这个闭包传输出去.
序列化一个闭包的逻辑就是 ClosureCleaner.clean里的内容.
这里代码太多了.序列化一个闭包的思路就是反射.匿名函数也是函数调用.就用反射把这个函数位置找出记下来传输给执行端.执行端根据函数位置反射调用.
1.2 MapPartitionsRDD
/** * An RDD that applies the provided function to every partition of the parent RDD. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDD[U](prev) { override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def getPartitions: Array[Partition] = firstParent[T].partitions override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)) override def clearDependencies() { super.clearDependencies() prev = null } }
1.2.1 这是一个MapPartitionsRDD.注意这里,它是如何描述MapPartitionsRDD的.
prev 这是父RDD f 这是Map函数. 就没有了.没有MapPartitionsRDD 本身的RDD数据.
这再次印证了转换不会产生任何数据.它只是单纯了记录父RDD以及如何转换的过程就完了,不会在转换阶段就产生任何数据集
1.2.2 preservesPartitioning
preservesPartitioning 表示是否保持父RDD的分区信息.
如果为false(默认为false),则会对结果重新分区.也就是Map系默认都会分区
如果为true,保留分区. 则按照 firstParent 保留分区
/** Returns the first parent RDD */ protected[spark] def firstParent[U: ClassTag]: RDD[U] = { dependencies.head.rdd.asInstanceOf[RDD[U]] }
可以看出(dependencies是依赖栈,所有head是直属父级) 如果保留RDD分区.是保留的父级RDD的分区.
1.2.3 override def compute(split: Partition, context: TaskContext)
来看看 MapPartitionRDD 是如何重写定义计算逻辑的
firstParent[T].iterator(split, context) 遍历 父RDD的每个分区进行计算
/** * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. */ final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } }
尝试以缓存的方式读取父级RDD.
如果设定的存储级别不是不缓存级别.都会尝试从缓存读取
如果设定的存储级别是不缓存,依然会尝试从CheckPoint 读取
总之,Spark的原则是 能不重计算读取就不重计算
对于没有持久化的RDD,compute方法实现了依赖链的递归调用(compute->firstParent.iterator->compute)
compute => firstParent.iterator => getOrCompute(真正计算) => computeOrReadCheckpoint
computeOrReadCheckpoint =>(isCheckpointedAndMaterialized) => firstParent.iterator
=> this.compute
getOrCompute
注意这里的 val cleanF = sc.clean(f).
sc.clean(f) 函数定义是这样的