在日常工作中使用spark RDD算子总是心里会泛起疑惑,仔细思考后发觉是对各算子的理解不够透彻,对各算子的输入输出的记忆不够深刻,对各算子的底层源码缺乏理解。遂决定在此后的工作和学习中,逐步完成对各个算子的总结,并且主要从三个方面着手,一是从算子的输入和输出,二是找出平常使用中容易忽略的技巧,三是实践一些实力来展现算子的特性。
spark RDD 通用编程接口之Partitions。
scala> val par=sc.textFile("/user/README.md")
par: org.apache.spark.rdd.RDD[String] = /user/README.mdMapPartitionsRDD[5] at textFile at <console>:24
scala>par.partitions.size
res2:Int = 2
scala>val par=sc.textFile("/user/README.md",6)
par:org.apache.spark.rdd.RDD[String] = /user/README.md MapPartitionsRDD[7] attextFile at <console>:24
scala>par.partitions.size
res3:Int = 6
此方法可以查看分区数量。
spark RDD通用编程接口之Dependencies。
scala>val pairrdd=par.flatMap(_.split(" ")).map(x=>(x,1))
pairrdd:org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at<console>:26
scala>pairrdd.dependencies.foreach{dep=>
| println("dependencytype:"+dep.getClass)
| println("dependencyRDD:"+dep.rdd)
| println("dependencypartitions:"+dep.rdd.partitions)
| println("dependency partitionssize:"+dep.rdd.partitions.length)
| }
dependencytype:class org.apache.spark.OneToOneDependency
dependencyRDD:MapPartitionsRDD[10] at flatMap at <console>:26
dependencypartitions:[Lorg.apache.spark.Partition;@23551fa1
dependencypartitions size:6
Dependencies返回依赖列表。
scala>val reducerdd=pairrdd.reduceByKey(_+_)
reducerdd:org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[12] at reduceByKey at<console>:28
scala>reducerdd.dependencies.foreach{dep=>
| println("dependency type:"+dep.getClass)
| println("dependencyRDD:"+dep.rdd)
| println("dependencypartitions:"+dep.rdd.partitions)
| println("dependency partitionssize:"+dep.rdd.partitions.length)
| }
dependencytype:class org.apache.spark.ShuffleDependency
dependencyRDD:MapPartitionsRDD[11] at map at <console>:26
dependencypartitions:[Lorg.apache.spark.Partition;@23551fa1
dependencypartitions size:6
经过reduceByKey运算之后,pairrdd的分区信息发生了变化,原先是OneToOneDependency,后来变成ShuffleDependency,其他基本不变。
spark RDD通用编程接口之分区计算Iterator。
这个例子还没有实验成功,先搁置。
spark RDD通用编程接口之Partitioner。
scala>par.partitioner
res15:Option[org.apache.spark.Partitioner] = None
scala>par.partitioner
res15:Option[org.apache.spark.Partitioner] = None
scala>var grouprdd=par.map(x=>(x,x)).groupByKey(neworg.apache.spark.HashPartitioner(4))
grouprdd:org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[15] atgroupByKey at <console>:26
scala>grouprdd.partitioner
res16:Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@4)
查看分区器类型。
创建RDD操作
parallelize[T](seq:Seq[T],numSlices:Int=defaultParallelism):RDD[T]
makeRDD[T](seq:Seq[T],numSlices:int=defaultParallelism):RDD[T]
不过,makeRDD可以制定每一个分区的首选位置,即制定存在某个节点上。如:
scala>var collect=Seq((1 to 10,Seq("master","slave1")),(11 to15,Seq("slave1","slave2")))
collect:Seq[(scala.collection.immutable.Range.Inclusive, Seq[String])] = List((Range(1,2, 3, 4, 5, 6, 7, 8, 9, 10),List(master, slave1)), (Range(11, 12, 13, 14,15),List(slave1, slave2)))
scala>var makerdd=sc.makeRDD(collect)
makerdd:org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] =ParallelCollectionRDD[16] at makeRDD at <console>:26
scala>makerdd.partitions.size
res18:Int = 2
scala>makerdd.preferredLocations(rdd.partitions(0))
res19:Seq[String] = List(master, slave1)
scala>makerdd.preferredLocations(rdd.partitions(1))
res20:Seq[String] = List(slave1, slave2)
这功能强,可以制定分区位置。
外部存储创建RDD的算子中,要强调一下wholeTextFiles。
wholeTextFiles(path:String,minPartitions:Int=defaultMinPartitions):RDD[(String,String)]
转换操作算子
distinct(numPartitions:Int):RDD[T]这个算子在平常使用中,重新分区的功能易被忽略,在这里强调一下。
scala>var b=makerdd.distinct(5)
b:org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] =MapPartitionsRDD[22] at distinct at <console>:28
scala>b.partitions.size
res23:Int = 5
coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]这个算子在之前的文章了有示例,不在重复,只是调一下其充分去的功能。
glom():RDD[Array[T]]强调一下其将RDD中每个分区元素变成一个数组。
randomSplit(weigth:Array[Double],seed:Long=Utils.random.nextLong):Array[RDD[T]]
intersection强调一下这个算子,原因是,在使用过程中常常忽略其能根据我们的需要自定义分区数和分区器。
intersection(other:RDD[T],numPartitions:Int):RDD[T]
intersection(other:RDD[T],partitioner:Partitioner):RDD[T]
类似的还有subtract。
subtract(other:RDD[T],numPartitions:Int):RDD[T]
subtract(other:RDD[T],partitioner:Partitioner):RDD[T]
疑问:能否同时进行重分区和定义分区器?