
时间:2022-06-20 18:26:07






PairRDDFunctions.scala的aggregateByKey,aggregateByKey算子使用给定的组合函数和一个中立的“零值”聚合每个key的值。这个函数可以返回不同的结果类型U,不同于RDD的值的类型 V。在 scala.TraversableOnce中,我们需要一个操作将V合并成U和一个操作合并两个U,前者的操作用于合并分区内的值,后者用于在分区之间合并值。为了避免内存


1.          def aggregateByKey[U: ClassTag](zeroValue: U,partitioner: Partitioner)(seqOp: (U, V) => U,

2.               combOp: (U, U) => U): RDD[(K, U)] =self.withScope {

3.             // Serialize the zero value to a byte arrayso that we can get a new clone of it on each key

4.             val zeroBuffer =SparkEnv.get.serializer.newInstance().serialize(zeroValue)

5.             val zeroArray = newArray[Byte](zeroBuffer.limit)

6.             zeroBuffer.get(zeroArray)


8.             lazy val cachedSerializer =SparkEnv.get.serializer.newInstance()

9.             val createZero = () =>cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))


11.          // We will clean the combiner closure laterin `combineByKey`

12.          val cleanedSeqOp = self.context.clean(seqOp)

13.          combineByKeyWithClassTag[U]((v: V) =>cleanedSeqOp(createZero(), v),

14.            cleanedSeqOp, combOp, partitioner)

15.        }



1.          def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])] = self.withScope {

2.             // groupByKey shouldn't use map sidecombine because map side combine does not

3.             // reduce the amount of data shuffled andrequires all map side data be inserted

4.             // into a hash table, leading to moreobjects in the old gen.

5.             val createCombiner = (v: V) => CompactBuffer(v)

6.             val mergeValue = (buf: CompactBuffer[V], v:V) => buf += v

7.             val mergeCombiners = (c1: CompactBuffer[V],c2: CompactBuffer[V]) => c1 ++= c2

8.             val bufs =combineByKeyWithClassTag[CompactBuffer[V]](

9.               createCombiner, mergeValue, mergeCombiners,partitioner, mapSideCombine = false)

10.          bufs.asInstanceOf[RDD[(K, Iterable[V])]]

11.        }








1.         def  mapPartitions[U: ClassTag](

2.               f: Iterator[T] => Iterator[U],

3.               preservesPartitioning: Boolean = false):RDD[U] = withScope {

4.             val cleanedF = sc.clean(f)

5.             new MapPartitionsRDD(

6.               this,

7.               (context: TaskContext, index: Int, iter:Iterator[T]) => cleanedF(iter),

8.               preservesPartitioning)

9.           }      



foreach处理一条条的数据,foreachPartition将一批数据写入数据库或Hbase,至少提升50%的性能。RDD.scala的foreachPartition foreach源代码如下:

1.           def foreach(f: T => Unit): Unit = withScope{

2.             val cleanF = sc.clean(f)

3.             sc.runJob(this, (iter: Iterator[T]) =>iter.foreach(cleanF))

4.           }


6.           /**

7.            * Applies a function f to each partition ofthis RDD.

8.            */

9.           def foreachPartition(f: Iterator[T] =>Unit): Unit = withScope {

10.          val cleanF = sc.clean(f)

11.          sc.runJob(this, (iter: Iterator[T]) =>cleanF(iter))

12.        }


(4) 使用coalesce算子整理碎片文件。coalesce 默认情况不产生Shuffle,基本工作机制把更多并行度的数据变成更少的并行度。例如1万个并行度的数据变成100个并行度。coalesce算子返回一个新的RDD,汇聚为`numPartitions`个分区。这将导致一个窄依赖,例如,如果从1000个分区变成100分区,将不会产生shuffle,而不是从当前分区的10个分区变成100新分区。然而,如果做一个激烈的合并,例如numpartitions= 1,这可能会导致计算发生在更少的节点。(例如设置numpartitions = 1,将计算在一个节点上)。为了避免这个情况,可以设置shuffle = true。这将增加一个shuffle 的步骤,但意味着当前的上游分区将并行执行。shuffle = true,可以汇聚到一个更大的分区,对于少量的分区这是有用的,

例如说100个分区,可能有几个分区数据非常大。那使用coalesce算子合并(1000,shuffle = true),将导致使用哈希分区器将数据分布在1000个分区。注意可选的分区coalescer必须是可序列化的。


1.           def coalesce(numPartitions: Int, shuffle:Boolean = false,

2.                        partitionCoalescer:Option[PartitionCoalescer] = Option.empty)

3.                       (implicit ord: Ordering[T] =null)

4.               : RDD[T] = withScope {

5.             require(numPartitions > 0, s"Numberof partitions ($numPartitions) must be positive.")

6.             if (shuffle) {

7.               /** Distributes elements evenly acrossoutput partitions, starting from a random partition. */

8.               val distributePartition = (index: Int,items: Iterator[T]) => {

9.                 var position = (newRandom(index)).nextInt(numPartitions)

10.              items.map { t =>

11.                // Note that the hash code of the keywill just be the key itself. The HashPartitioner

12.                // will mod it with the number oftotal partitions.

13.                position = position + 1

14.                (position, t)

15.              }

16.            } : Iterator[(Int, T)]


18.            // include a shuffle step so that ourupstream tasks are still distributed

19.            new CoalescedRDD(

20.              new ShuffledRDD[Int, T,T](mapPartitionsWithIndex(distributePartition),

21.              new HashPartitioner(numPartitions)),

22.              numPartitions,

23.              partitionCoalescer).values

24.          } else {

25.            new CoalescedRDD(this, numPartitions,partitionCoalescer)

26.          }

27.        }



(5) 使用repartition算子,其背后使用的仍是coalesce。但是shuffle 值默认设置为true,repartition算子会产生Shuffle。Repartition代码如下:

1.              def repartition(numPartitions: Int)(implicitord: Ordering[T] = null): RDD[T] = withScope {

2.             coalesce(numPartitions, shuffle = true)

3.           }



1.           def  repartitionAndSortWithinPartitions(partitioner:Partitioner): JavaPairRDD[K, V] = {

2.             val comp =com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]

3.             repartitionAndSortWithinPartitions(partitioner,comp)

4.           }



1.          def persist(newLevel: StorageLevel): this.type= {

2.             if (isLocallyCheckpointed) {

3.               // This means the user previously calledlocalCheckpoint(), which should have already

4.               // marked this RDD for persisting. Herewe should override the old storage level with

5.               // one that is explicitly requested bythe user (after adapting it to use disk).

6.               persist(LocalRDDCheckpointData.transformStorageLevel(newLevel),allowOverride = true)

7.             } else {

8.               persist(newLevel, allowOverride = false)

9.             }

10.        }





1.           def mapPartitionsWithIndex[U: ClassTag](

2.               f: (Int, Iterator[T]) => Iterator[U],

3.               preservesPartitioning: Boolean = false):RDD[U] = withScope {

4.             val cleanedF = sc.clean(f)

5.             new MapPartitionsRDD(

6.               this,

7.               (context: TaskContext, index: Int, iter:Iterator[T]) => cleanedF(index, iter),

8.               preservesPartitioning)

9.           }