3.1 partitionBy
源码:
函数说明:
将数据按照指定的分区器Partitioner重新分区。spark默认的分区器是HashPartitioner。
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
3.2 reduceByKey
源码:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
函数说明:
可以将数据按照相同的key对value进行聚合。
object Spark_09_RDD_reduceByKey_Transformation {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List("A", "B", "A", "A", "E"), 2)
println(rdd.map((_, 1)).reduceByKey(_ + _).collect().mkString)
sc.stop()
}
}
3.3 groupByKey
源码:
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(new HashPartitioner(numPartitions))
}
函数说明:
将数据源的数据,相同的key的数据分在一个组中,形成一个对偶元组。
元组的第一个元素就是key
元组的第二个参数就是相同的key的value的集合。
将分区的数据直接转换为相同类型的内存数组进行后续处理。
object Spark_09_RDD_groupByKey_Transformation {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List("A", "B", "A", "A", "E"), 2)
rdd.map((_, 1)).groupByKey(2).collect().foreach(println)
// numPartitions分区个数
// (B,CompactBuffer(1))
// (A,CompactBuffer(1, 1, 1))
// (E,CompactBuffer(1))
sc.stop()
}
}
3.4 aggregateByKey
reduceByKey支持分区内预聚合的功能,可以有效减少shuffle时落盘的数据量,提升性能。但它要求分区内和分区间的聚合逻辑相同。
源码:
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
// We will clean the combiner closure later in `combineByKey`
val cleanedSeqOp = self.context.clean(seqOp)
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp, combOp, partitioner)
}
函数说明:
将数据根据不同的规则进行分区内计算和分区间计算。
object Spark_10_RDD_aggregateByKey_Transformation {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List("A", "B", "A", "A", "E"), 2)
println(rdd.map((_, 1)).aggregateByKey(0)(_+_, _+_).collect().mkString(","))
// (B,1),(A,3),(E,1)
// 第一个括号内的是计算分区内和分区间的初始值。
// 第二个括号内第一个参数是规定分区内的计算逻辑
// 第二个括号内第二个参数是规定分区间的计算逻辑
sc.stop()
}
}