基本需求
将Keyed RDD[(Key,Value)]按Key保存到不同文件。
测试数据
数据格式:id,studentId,language,math,english,classId,departmentId
1,111,68,69,90,Class1,Economy
2,112,73,80,96,Class1,Economy
3,113,90,74,75,Class1,Economy
4,114,89,94,93,Class1,Economy
5,115,99,93,89,Class1,Economy
6,121,96,74,79,Class2,Economy
7,122,89,86,85,Class2,Economy
8,123,70,78,61,Class2,Economy
9,124,76,70,76,Class2,Economy
10,211,89,93,60,Class1,English
11,212,76,83,75,Class1,English
12,213,71,94,90,Class1,English
13,214,94,94,66,Class1,English
14,215,84,82,73,Class1,English
15,216,85,74,93,Class1,English
16,221,77,99,61,Class2,English
17,222,80,78,96,Class2,English
18,223,79,74,96,Class2,English
19,224,75,80,78,Class2,English
20,225,82,85,63,Class2,English
用Spark RDD实现
package com.bigData.spark
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.log4j.{Level, Logger}
import org.apache.spark.HashPartitioner
import org.apache.spark.sql.SparkSession
object OutputToMultiFile {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val spark = SparkSession.builder().master("local[3]").appName(this.getClass.getSimpleName.replace("$","")).getOrCreate()
val data =spark.sparkContext.textFile("data/scores.csv")
.map(item=>(item.split(",").takeRight(2).reverse.mkString("_"),item))
.partitionBy(new HashPartitioner(4))
data.saveAsHadoopFile("data/multiKeyedDir",
classOf[String],
classOf[String],
classOf[PairRDDMultipleTextOutputFormat]
)
spark.stop()
}
}
class PairRDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={
val fileNamePrefix=key.asInstanceOf[String]
val fileName=fileNamePrefix+"-"+name
fileName
}
override def generateActualKey(key: Any, value: Any): String = {
null
}
}