Spark RDD 按Key保存到不同文件

时间:2024-03-30 18:07:18

基本需求

将Keyed RDD[(Key,Value)]按Key保存到不同文件。

测试数据

数据格式:id,studentId,language,math,english,classId,departmentId

1,111,68,69,90,Class1,Economy
2,112,73,80,96,Class1,Economy
3,113,90,74,75,Class1,Economy
4,114,89,94,93,Class1,Economy
5,115,99,93,89,Class1,Economy
6,121,96,74,79,Class2,Economy
7,122,89,86,85,Class2,Economy
8,123,70,78,61,Class2,Economy
9,124,76,70,76,Class2,Economy
10,211,89,93,60,Class1,English
11,212,76,83,75,Class1,English
12,213,71,94,90,Class1,English
13,214,94,94,66,Class1,English
14,215,84,82,73,Class1,English
15,216,85,74,93,Class1,English
16,221,77,99,61,Class2,English
17,222,80,78,96,Class2,English
18,223,79,74,96,Class2,English
19,224,75,80,78,Class2,English
20,225,82,85,63,Class2,English

用Spark RDD实现

package com.bigData.spark

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.log4j.{Level, Logger}
import org.apache.spark.HashPartitioner
import org.apache.spark.sql.SparkSession

/**
  * Author: Wang Pei
  * License: Copyright(c) Pei.Wang
  * Summary:
  *   RDD 按Key保存到不同文件
  */
object OutputToMultiFile {
  def main(args: Array[String]): Unit = {


    /**设置日志等级*/
    Logger.getLogger("org").setLevel(Level.WARN)

    /**spark环境*/
    val spark = SparkSession.builder().master("local[3]").appName(this.getClass.getSimpleName.replace("$","")).getOrCreate()

    /**Keyed RDD*/
    val data =spark.sparkContext.textFile("data/scores.csv")
        //Keyed RDD
        .map(item=>(item.split(",").takeRight(2).reverse.mkString("_"),item))
        //按Key Hash分区,4个Key分到4个Partition中
        .partitionBy(new HashPartitioner(4))


    /**按Key保存到不同文件*/
    data.saveAsHadoopFile("data/multiKeyedDir",
      classOf[String],
      classOf[String],
      classOf[PairRDDMultipleTextOutputFormat]
    )

    spark.stop()

  }

}

/**继承类重写方法*/
class PairRDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  //1)文件名:根据key和value自定义输出文件名。
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={
    val fileNamePrefix=key.asInstanceOf[String]
    val fileName=fileNamePrefix+"-"+name
    fileName
  }

  //2)文件内容:默认同时输出key和value。这里指定不输出key。
  override def generateActualKey(key: Any, value: Any): String = {
    null
  }
}

Spark RDD 按Key保存到不同文件