对于优化<网络性能>极为重要,将RDD以序列化格式来保存减少内存占用.
spark.serializer=org.apache.spark.serializer.JavaSerialization
:
Spark默认 使用Java自带的ObjectOutputStream 框架来序列化对象,这样任何实现了 java.io.Serializable 接口的对象,都能被序列化。同时,还可以通过扩展 java.io.Externalizable 来控制序列化性能。Java序列化很灵活但性能差速度很慢,同时序列化后占用的字节数也较多。
spark.serializer=org.apache.spark.serializer.KryoSerialization
:
KryoSerialization速度快,可以配置为任何org.apache.spark.serializer的子类。但Kryo也不支持所有实现了 java.io.Serializable 接口的类型,它需要你在程序中 register 需要序列化的类型,以得到最佳性能。
LZO的支持要求先安装 Hadoop-lzo包(每个节点), 并放到 Spark本地库中。如果是Debian包安装,在调用spark-submit时加上 --driver-library-path /usr/lib/hadoop/lib/native/ --driver-class-path /usr/lib/hadoop/lib/ 就可以。 下载lzo http://cn.jarfire.org/hadoop.lzo.html
在 SparkConf 初始化的时候调用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) 使用 Kryo。这个设置不仅控制各个worker节点之间的混洗数据序列化格式,同时还控制RDD存到磁盘上的序列化格式。需要在使用时注册需要序列化的类型,建议在对网络敏感的应用场景下使用Kryo。
如果你的自定义类型需要使用Kryo序列化,可以用 registerKryoClasses 方法先注册:
val conf = new SparkConf.setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
最后,如果你不注册需要序列化的自定义类型,Kryo也能工作,不过每一个对象实例的序列化结果都会包含一份完整的类名,这有点浪费空间。
在Scala中使用New API (Twitter Elephant Bird 包) lzo JsonInputFormat读取 LZO 算法压缩的 JSON 文件:
val input = sc.newAPIHadoopFile(inputFile, classOf[lzoJsonInputFormat], classOf[LongWritable], classOf[MapWritable], conf)
inputFile: 输入路径
接收第一个类:“格式”类,输入格式
接收第二个类:“键”
接收第二个类:“值”
conf:设置一些额外的压缩选项
在Scala中使用老API直接读取 KeyValueTextInputFormat()最简单的Hadoop输入格式 :
val input = sc.HadoopFile[Text, Text, KeyValueTextInputFormat](inputFile).map{ case (x, y) => (x.toString, y.toString) }
注:如果读取单个压缩过的输入,做好不要考虑使用Spark的封装(textFile/SequenceFile..),而是使用 newAPIHadoopFile 或者 HadoopFile,并指定正确的压缩解码器。 有些输入格式(如SequenceFile)允许我们只压缩键值对数据中的值,这在查询时很有用。其它一些输入格式也有自己的压缩控制,如:Twitter Elephant Bird 包中的许多格式都可以使用LZO算法压缩数据。