package com.lyzx.day37 import org.apache.spark.{Partitioner, SparkConf, SparkContext} class D1 { //partitionBy和自定义分区器解决数据倾斜的问题 def f1(sc:SparkContext): Unit ={ val r1 = sc.parallelize(1 to 100,4) val filterRdd = r1.filter(_ > 49) filterRdd.mapPartitionsWithIndex((idx,itr)=>{ while(itr.hasNext){ println("["+idx+"]"+itr.next()) } for(v <- itr) yield v }).collect() println(filterRdd.partitions.length) val reP = filterRdd .map(x=>(x,null)) .partitionBy(new P(r1.partitions.length)) .map(x=>x._1) reP.mapPartitionsWithIndex((idx,itr)=>{ while(itr.hasNext){ println("["+idx+"]"+itr.next()) } for(v <- itr) yield v }).collect() } //只对K,V键值对的键做操作 def f2(sc:SparkContext): Unit ={ val r1 = sc.parallelize(1 to 10).map(x=>(x,x)) r1.mapValues(_+1) .foreach(println) println("====================================") //做完mapValues之后再压平 可以把一个值映射为一个集合 //在压平的过程中把集合中的值掏出来,组合为(K1,V1),(K1,V2)的格式 r1.flatMapValues(x=>Seq(x+100,x)) .foreach(println) } } object D1{ def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("day36").setMaster("local") val sc = new SparkContext(conf) val t = new D1 // t.f1(sc) t.f2(sc) sc.stop() } } //自定义分区器,把数据均匀的分配到各个Partition上 class P(num:Int) extends Partitioner{ private var N:Int = 0 override def numPartitions: Int = num //这么做就和key没有关系,只是完全按照数据的个数把他们分配到各个Partition中 override def getPartition(key: Any): Int = { val currentIndex = (N % num) N += 1 N = N % num //这样做是为了让N保持在0 - num-1 之间防止N超出long的范围 currentIndex } }