详解 Spark 核心编程之 RDD 序列化
/*
简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化
*/
object TestKryoSerializable {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Ser")
// 替换默认的序列化机制
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类,该类必须混入 Serializable 特质
conf.registerKryoClasses(Array(classOf[Searcher]))
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array(
"hello world", "hello spark",
"kafka", "hive"
), 2)
val searcher = new Searcher("h")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}
}
case class Searcher(val query: String) {
def isMatch(s: String) = {
s.contains(query) // this.query
}
def getMatchedRDD1(rdd: RDD[String]) = {
rdd.filter(isMatch)
}
def getMatchedRDD2(rdd: RDD[String]) = {
val q = query
rdd.filter(_.contains(q))
}
}