[Spark]-源码解析-RDD之transform

时间:2021-11-22 17:17:37

  

    1.Map系转换

    Map转换算是 RDD 的经典转换操作之一了.就以它开头.Map的源码如下  

  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

  1.1  sc.clean(f)   

      第一眼看见的就是一个 clean 函数.这是做什么的.来看看定义        

      /**
       * Clean a closure to make it ready to be serialized and sent to tasks
       * (removes unreferenced variables in $outer's, updates REPL variables)
       * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
       * check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
       * if not.
       */
      private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
        ClosureCleaner.clean(f, checkSerializable)
        f
      }

    说的非常清楚.clean 整理一个闭包,使其可以序列化并发送到任务.这里想明白.为什么会序列化闭包传送到任务.

      因为Map的闭包执行逻辑本身是上传到Driver端的,但是真正的执行是交给executor的.所以才会有序列化这个闭包传输出去.

    序列化一个闭包的逻辑就是 ClosureCleaner.clean里的内容.

      这里代码太多了.序列化一个闭包的思路就是反射.匿名函数也是函数调用.就用反射把这个函数位置找出记下来传输给执行端.执行端根据函数位置反射调用.

  1.2 MapPartitionsRDD     
        /**
         * An RDD that applies the provided function to every partition of the parent RDD.
         */
        private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
            var prev: RDD[T],
            f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
            preservesPartitioning: Boolean = false)
          extends RDD[U](prev) {

          override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

          override def getPartitions: Array[Partition] = firstParent[T].partitions

          override def compute(split: Partition, context: TaskContext): Iterator[U] =
            f(context, split.index, firstParent[T].iterator(split, context))

          override def clearDependencies() {
            super.clearDependencies()
            prev = null
          }
        }

    1.2.1 这是一个MapPartitionsRDD.注意这里,它是如何描述MapPartitionsRDD的.

      prev 这是父RDD f 这是Map函数. 就没有了.没有MapPartitionsRDD 本身的RDD数据.

      这再次印证了转换不会产生任何数据.它只是单纯了记录父RDD以及如何转换的过程就完了,不会在转换阶段就产生任何数据集

    1.2.2 preservesPartitioning

      preservesPartitioning 表示是否保持父RDD的分区信息. 
      
       如果为false(默认为false),则会对结果重新分区.也就是Map系默认都会分区
     如果为true,保留分区. 则按照
firstParent 保留分区   
        /** Returns the first parent RDD */
        protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
          dependencies.head.rdd.asInstanceOf[RDD[U]]
        }       
      可以看出(dependencies是依赖栈,所有head是直属父级) 如果保留RDD分区.是保留的父级RDD的分区.
    1.2.3 override def compute(split: Partition, context: TaskContext)
      来看看 MapPartitionRDD 是如何重写定义计算逻辑的
        firstParent[T].iterator(split, context) 遍历 父RDD的每个分区进行计算
      
        /**
           * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
           * This should ''not'' be called by users directly, but is available for implementors of custom
           * subclasses of RDD.
           */
          final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
            if (storageLevel != StorageLevel.NONE) {
              getOrCompute(split, context)
            } else {
              computeOrReadCheckpoint(split, context)
            }
          }

        尝试以缓存的方式读取父级RDD.

          如果设定的存储级别不是不缓存级别.都会尝试从缓存读取

          如果设定的存储级别是不缓存,依然会尝试从CheckPoint 读取

          总之,Spark的原则是 能不重计算读取就不重计算

        对于没有持久化的RDD,compute方法实现了依赖链的递归调用(compute->firstParent.iterator->compute)

          
compute => firstParent.iterator => getOrCompute(真正计算) => computeOrReadCheckpoint
                computeOrReadCheckpoint =>(isCheckpointedAndMaterialized) => firstParent.iterator
                             => this.
compute
 
getOrCompute
 


注意这里的 val cleanF = sc.clean(f).

sc.clean(f) 函数定义是这样的