spark算子使用总结

时间:2023-01-11 20:47:20

在日常工作中使用spark RDD算子总是心里会泛起疑惑,仔细思考后发觉是对各算子的理解不够透彻,对各算子的输入输出的记忆不够深刻,对各算子的底层源码缺乏理解。遂决定在此后的工作和学习中,逐步完成对各个算子的总结,并且主要从三个方面着手,一是从算子的输入和输出,二是找出平常使用中容易忽略的技巧,三是实践一些实力来展现算子的特性。

spark RDD 通用编程接口之Partitions

scala> val par=sc.textFile("/user/README.md")

par: org.apache.spark.rdd.RDD[String] = /user/README.mdMapPartitionsRDD[5] at textFile at <console>:24

 

scala>par.partitions.size

res2:Int = 2

 

scala>val par=sc.textFile("/user/README.md",6)

par:org.apache.spark.rdd.RDD[String] = /user/README.md MapPartitionsRDD[7] attextFile at <console>:24

 

scala>par.partitions.size

res3:Int = 6

此方法可以查看分区数量。

spark RDD通用编程接口之Dependencies

scala>val pairrdd=par.flatMap(_.split(" ")).map(x=>(x,1))

pairrdd:org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at<console>:26

 

scala>pairrdd.dependencies.foreach{dep=>

     | println("dependencytype:"+dep.getClass)

     | println("dependencyRDD:"+dep.rdd)

     | println("dependencypartitions:"+dep.rdd.partitions)

     | println("dependency partitionssize:"+dep.rdd.partitions.length)

     | }

dependencytype:class org.apache.spark.OneToOneDependency

dependencyRDD:MapPartitionsRDD[10] at flatMap at <console>:26

dependencypartitions:[Lorg.apache.spark.Partition;@23551fa1

dependencypartitions size:6

Dependencies返回依赖列表。

scala>val reducerdd=pairrdd.reduceByKey(_+_)

reducerdd:org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[12] at reduceByKey at<console>:28

 

scala>reducerdd.dependencies.foreach{dep=>

     | println("dependency type:"+dep.getClass)

     | println("dependencyRDD:"+dep.rdd)

     | println("dependencypartitions:"+dep.rdd.partitions)

     | println("dependency partitionssize:"+dep.rdd.partitions.length)

     | }

dependencytype:class org.apache.spark.ShuffleDependency

dependencyRDD:MapPartitionsRDD[11] at map at <console>:26

dependencypartitions:[Lorg.apache.spark.Partition;@23551fa1

dependencypartitions size:6

经过reduceByKey运算之后,pairrdd的分区信息发生了变化,原先是OneToOneDependency,后来变成ShuffleDependency,其他基本不变。

spark RDD通用编程接口之分区计算Iterator

这个例子还没有实验成功,先搁置。

spark RDD通用编程接口之Partitioner

scala>par.partitioner

res15:Option[org.apache.spark.Partitioner] = None

scala>par.partitioner

res15:Option[org.apache.spark.Partitioner] = None

 

scala>var grouprdd=par.map(x=>(x,x)).groupByKey(neworg.apache.spark.HashPartitioner(4))

grouprdd:org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[15] atgroupByKey at <console>:26

 

scala>grouprdd.partitioner

res16:Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@4)

查看分区器类型。

创建RDD操作

parallelize[T](seq:Seq[T],numSlices:Int=defaultParallelism):RDD[T]

makeRDD[T](seq:Seq[T],numSlices:int=defaultParallelism):RDD[T]

不过,makeRDD可以制定每一个分区的首选位置,即制定存在某个节点上。如:

scala>var collect=Seq((1 to 10,Seq("master","slave1")),(11 to15,Seq("slave1","slave2")))

collect:Seq[(scala.collection.immutable.Range.Inclusive, Seq[String])] = List((Range(1,2, 3, 4, 5, 6, 7, 8, 9, 10),List(master, slave1)), (Range(11, 12, 13, 14,15),List(slave1, slave2)))

 

scala>var makerdd=sc.makeRDD(collect)

makerdd:org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] =ParallelCollectionRDD[16] at makeRDD at <console>:26

scala>makerdd.partitions.size

res18:Int = 2

 

scala>makerdd.preferredLocations(rdd.partitions(0))

res19:Seq[String] = List(master, slave1)

 

scala>makerdd.preferredLocations(rdd.partitions(1))

res20:Seq[String] = List(slave1, slave2)

这功能强,可以制定分区位置。

外部存储创建RDD的算子中,要强调一下wholeTextFiles

wholeTextFiles(path:String,minPartitions:Int=defaultMinPartitions):RDD[(String,String)]

转换操作算子

distinct(numPartitions:Int):RDD[T]这个算子在平常使用中,重新分区的功能易被忽略,在这里强调一下。

scala>var b=makerdd.distinct(5)

b:org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] =MapPartitionsRDD[22] at distinct at <console>:28

scala>b.partitions.size

res23:Int = 5

coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]这个算子在之前的文章了有示例,不在重复,只是调一下其充分去的功能。

glom():RDD[Array[T]]强调一下其将RDD中每个分区元素变成一个数组。

randomSplit(weigth:Array[Double],seed:Long=Utils.random.nextLong):Array[RDD[T]]

intersection强调一下这个算子,原因是,在使用过程中常常忽略其能根据我们的需要自定义分区数和分区器。

intersection(other:RDD[T],numPartitions:Int):RDD[T]

intersection(other:RDD[T],partitioner:Partitioner):RDD[T]

类似的还有subtract

subtract(other:RDD[T],numPartitions:Int):RDD[T]

subtract(other:RDD[T],partitioner:Partitioner):RDD[T]

疑问:能否同时进行重分区和定义分区器?