【Spark | Spark-Core篇】转换算子Transformation-3. Key-Value类型

时间:2024-10-20 07:16:34

3.1 partitionBy

源码:

函数说明:

将数据按照指定的分区器Partitioner重新分区。spark默认的分区器是HashPartitioner。

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

3.2 reduceByKey

源码:

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

函数说明:

可以将数据按照相同的key对value进行聚合。

object Spark_09_RDD_reduceByKey_Transformation {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")

    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List("A", "B", "A", "A", "E"), 2)

    println(rdd.map((_, 1)).reduceByKey(_ + _).collect().mkString)

    sc.stop()
  }
}

3.3 groupByKey

源码:

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(new HashPartitioner(numPartitions))
  }

函数说明:

将数据源的数据,相同的key的数据分在一个组中,形成一个对偶元组。

元组的第一个元素就是key

元组的第二个参数就是相同的key的value的集合。

将分区的数据直接转换为相同类型的内存数组进行后续处理。

object Spark_09_RDD_groupByKey_Transformation {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")

    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List("A", "B", "A", "A", "E"), 2)

    rdd.map((_, 1)).groupByKey(2).collect().foreach(println)
    // numPartitions分区个数
//    (B,CompactBuffer(1))
//    (A,CompactBuffer(1, 1, 1))
//    (E,CompactBuffer(1))

    sc.stop()
  }
}

3.4 aggregateByKey

reduceByKey支持分区内预聚合的功能,可以有效减少shuffle时落盘的数据量,提升性能。但它要求分区内和分区间的聚合逻辑相同。

源码:

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
    val zeroArray = new Array[Byte](zeroBuffer.limit)
    zeroBuffer.get(zeroArray)

    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
    val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

    // We will clean the combiner closure later in `combineByKey`
    val cleanedSeqOp = self.context.clean(seqOp)
    combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
      cleanedSeqOp, combOp, partitioner)
  }

函数说明:

将数据根据不同的规则进行分区内计算和分区间计算。

object Spark_10_RDD_aggregateByKey_Transformation {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")

    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List("A", "B", "A", "A", "E"), 2)

    println(rdd.map((_, 1)).aggregateByKey(0)(_+_, _+_).collect().mkString(","))
    // (B,1),(A,3),(E,1)
    // 第一个括号内的是计算分区内和分区间的初始值。
    // 第二个括号内第一个参数是规定分区内的计算逻辑
    // 第二个括号内第二个参数是规定分区间的计算逻辑
    sc.stop()
  }
}