/** * :: DeveloperApi :: * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a * tuple with the list of values for that key. * * Note: This is an internal API. We recommend users use RDD.cogroup(...) instead of * instantiating this directly.
* @param rdds parent RDDs. * @param part partitioner used to partition the shuffle output */ @DeveloperApi class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner) extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
// For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, ArrayBuffer bs). // Each ArrayBuffer is represented as a CoGroup, and the resulting Array as a CoGroupCombiner. // CoGroupValue is the intermediate state of each value before being merged in compute. private type CoGroup = CompactBuffer[Any] private type CoGroupValue = (Any, Int) // Int is dependency number private type CoGroupCombiner = Array[CoGroup]
private var serializer: Option[Serializer] = None
/** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */ def setSerializer(serializer: Serializer): CoGroupedRDD[K] = { this.serializer = Option(serializer) this } override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
//如果rdd的分区函数和CoGroupedRDD的分区函数相同,则相互之间的依赖是窄依赖
if (rdd.partitioner == Some(part)) { logDebug("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) } else {
//否则是宽依赖 logDebug("Adding shuffle dependency with " + rdd) new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer) } } }
/* * 获取其分区配置信息CoGroupPartition,其由CoGroupPartition(
idx: Int, val narrowDeps: Array[Option[NarrowCoGroupSplitDep]])组成
其中idx代表对应分区索引,narrowDeps存储的是其依赖的数组
*/ override def getPartitions: Array[Partition] = { val array = new Array[Partition](part.numPartitions) for (i <- 0 until array.length) { // Each CoGroupPartition will have a dependency per contributing RDD array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) => // Assume each RDD contributed a single dependency, and get it dependencies(j) match {
//宽依赖直接返回None case s: ShuffleDependency[_, _, _] => None case _ =>
//其他则为窄依赖 Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))) } }.toArray) } array }
override val partitioner: Some[Partitioner] = Some(part)
//在每个分区上根据传入的CoGroupPartition进行计算 override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = { val sparkConf = SparkEnv.get.conf
//此参数决定了其中间整理的过程是在内存中执行还是内存+磁盘中执行 val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true) val split = s.asInstanceOf[CoGroupPartition] //代表有多少个rdd,每个rdd根据分区函数对应其依赖 val numRdds = dependencies.length
// A list of (rdd iterator, dependency number) pairs
// rddIterators是个KV的迭代器,其K为Product2的迭代器,其V是其索引 val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
//如果是窄依赖,则直接拉取父RDD对应分区的数值 case oneToOneDependency: OneToOneDependency[Product2[K, Any]] => val dependencyPartition = split.narrowDeps(depNum).get.split // Read them from the parent val it = oneToOneDependency.rdd.iterator(dependencyPartition, context) rddIterators += ((it, depNum)) //如果是宽依赖,则从shuffle的中间结果拉取对应分区的数值 case shuffleDependency: ShuffleDependency[_, _, _] => // Read map outputs of shuffle val it = SparkEnv.get.shuffleManager .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context) .read() rddIterators += ((it, depNum)) } /* * rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] * [{Iterator[Product2[K, Any]],0},{Iterator[Product2[K, Any]],1}] * */
if (!externalSorting) {//在内存中整理中间结果 val map = new AppendOnlyMap[K, CoGroupCombiner]//CoGroupCombiner为Buffer数组,相同的K只会保留1个 val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => { if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup) } val getCombiner: K => CoGroupCombiner = key => { map.changeValue(key, update) }
//遍历迭代器数组,将相同的KEY的V存放在CoGroupCombiner里面 rddIterators.foreach { case (it, depNum) => while (it.hasNext) { val kv = it.next() getCombiner(kv._1)(depNum) += kv._2 } } new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) } else {//在内存+磁盘中整理中间结果 val map = createExternalMap(numRdds)
//插入到ExternalAppendOnlyMap里面 for ((it, depNum) <- rddIterators) { map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) } context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled) context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled) new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) } } ……
}
|