详解 Spark 核心编程之 RDD 序列化

时间:2024-06-01 07:06:36
/* 简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化 */ object TestKryoSerializable { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("Ser") // 替换默认的序列化机制 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注册需要使用 kryo 序列化的自定义类,该类必须混入 Serializable 特质 conf.registerKryoClasses(Array(classOf[Searcher])) val sc = new SparkContext(conf) val rdd: RDD[String] = sc.makeRDD(Array( "hello world", "hello spark", "kafka", "hive" ), 2) val searcher = new Searcher("h") val result: RDD[String] = searcher.getMatchedRDD1(rdd) result.collect.foreach(println) } } case class Searcher(val query: String) { def isMatch(s: String) = { s.contains(query) // this.query } def getMatchedRDD1(rdd: RDD[String]) = { rdd.filter(isMatch) } def getMatchedRDD2(rdd: RDD[String]) = { val q = query rdd.filter(_.contains(q)) } }