Spark之通过自定义分区器解决数据倾斜问题

时间:2022-06-02 20:57:31
package com.lyzx.day37

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

class D1 {

  //partitionBy和自定义分区器解决数据倾斜的问题
  def f1(sc:SparkContext): Unit ={
     val r1 = sc.parallelize(1 to 100,4)
     val filterRdd = r1.filter(_ > 49)
      filterRdd.mapPartitionsWithIndex((idx,itr)=>{
        while(itr.hasNext){
          println("["+idx+"]"+itr.next())
        }
        for(v <- itr) yield v
      }).collect()
    println(filterRdd.partitions.length)
    val reP = filterRdd
                .map(x=>(x,null))
                .partitionBy(new P(r1.partitions.length))
                  .map(x=>x._1)

    reP.mapPartitionsWithIndex((idx,itr)=>{
      while(itr.hasNext){
        println("["+idx+"]"+itr.next())
      }
      for(v <- itr) yield v
    }).collect()
  }


  //只对K,V键值对的键做操作
  def f2(sc:SparkContext): Unit ={
    val r1 = sc.parallelize(1 to 10).map(x=>(x,x))
    r1.mapValues(_+1)
      .foreach(println)
    println("====================================")

    //做完mapValues之后再压平 可以把一个值映射为一个集合
    //在压平的过程中把集合中的值掏出来,组合为(K1,V1),(K1,V2)的格式
    r1.flatMapValues(x=>Seq(x+100,x))
        .foreach(println)
  }


}

object D1{

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("day36").setMaster("local")
    val sc = new SparkContext(conf)
    val t = new D1

//    t.f1(sc)
    t.f2(sc)
    sc.stop()
  }
}

//自定义分区器,把数据均匀的分配到各个Partitionclass P(num:Int) extends Partitioner{
  private var N:Int = 0
  override def numPartitions: Int = num

  //这么做就和key没有关系,只是完全按照数据的个数把他们分配到各个Partition  override def getPartition(key: Any): Int = {
    val currentIndex = (N % num)
    N += 1
    N = N % num  //这样做是为了让N保持在0 - num-1 之间防止N超出long的范围
    currentIndex
  }
}