RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable;可类比String,它也是不可变的,但是可有很多方法,如切分...
1. RDD的属性
每个属性对应一个方法,getPartitions: Array[Partition]、compute、getDependencies、Partitioner、 getPreferredLocations(每个分区对应一个Task,把Task发送到哪个位置记录下来)
* Internally, each RDD is characterized by five main properties:
* 1) - A list of partitions; 一组分区(Partition),即数据集的基本组成单位; 所有的RDD都有分区;
* 2) - A function for computing each split; 一个计算每个分区的函数;
* 3) - A list of dependencies on other RDDs; RDD之间的依赖关系,不是所有的RDD都有依赖;
* 4) - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned即Hash分区器);一个Partitioner,即RDD的分片函数;只有键值对RDD才有分区器
* 5) - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file); 一个列表,存储存取每个Partition的优先位置(preferred location)。(每个Task任务发送到离数据最近的位置--节点的Executor上),如果一个节点的Executor由于内存cpu等原因不能执行,
spark会对它有个降级,给同一个节点的另外一个Executor去执行,它如果还是不能执行就去同一个机架上的其他机器上的Executor(跨节点传输数据了),这又是一个降级;如果同一个机架上的都不行,则给同一个机房的其他机架上发,又是一个降级;
移动数据不如移动计算;
分区并行计算 task
2. RDD特点
RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。
1)弹性
存储的弹性:内存与磁盘的自动切换;(可以基于内存也可以基于磁盘)
容错的弹性:数据丢失可以自动恢复;(RDD记录了数据怎么计算的,数据丢失了可在上一级自动恢复)
计算的弹性:计算出错重试机制;(Executor挂了,Driver可以转移到其他Executor)
分片的弹性:可根据需要重新分片。(总的数据量不变,分区数是可变的)
2)分区
RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取(逻辑)指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。
3)只读
RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。
由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了。
RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系(懒加载、懒执行,只有遇到action才会真正的执行);另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中。
4)依赖
RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应(从上游RDD看)的,(上游的某一个分区被下游的一个或多个分区所使用);
另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女;
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle,总结:宽依赖我们形象的比喻为超生
5)缓存
可以缓存到内存也可以缓存到磁盘,缓存没有删除依赖关系;任务执行完之后不管是缓存到内存还是磁盘,它都会被删除掉;
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。如下图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。
----->R5
R1--->R2--->R3----->R4 ,RDD3缓存到内存计算1次即可,这样子R5、R6从内存掉即可;缓存默认是没开启的,需要调方法;
----->R6
RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。
但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
def cache(): this.type = persist()
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
StorageLevel.scala 存储级别源码 ,在存储级别的末尾加上“_2”来把持久化数据存为两份
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
...
缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
例如:
scala> val rdd = sc.makeRDD(Array("kris"))
scala> val nocache = rdd.map(_.toString + System.currentTimeMillis)
scala> nocache.collect
res0: Array[String] = Array(kris1554979614968) scala> nocache.collect
res1: Array[String] = Array(kris1554979627951) scala> nocache.collect
res2: Array[String] = Array(kris1554979629257) scala> val cache = rdd.map(_.toString + System.currentTimeMillis).cache //将RDD转换为携带当前时间戳并做缓存
cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[] at map at <console>: scala> cache.collect // 多次打印做了相同的缓存结果
res3: Array[String] = Array(kris1554979702053) scala> cache.collect
res4: Array[String] = Array(kris1554979702053)
RDD若缓存到磁盘(或者内存中),当任务跑完结束时,它会把整个缓存的目录都删除掉,以至于缓存不能被其他任务所使用;
6)RDD CheckPoint 检查点机制
缓存到HDFS上,文件一直都在;切断了依赖;
检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。
为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。同上CheckPoint也需要调用 rdd.checkpoint
虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。
如果依赖链特别长,可把上游那个存起来缓存起来,直接从缓存里边拿即可,就不会从头开始计算;checkPoint,把依赖给切断,给它缓存起来,下游的RDD对上游也没有依赖,直接从缓存中去取; 而缓存是没有切断依赖的;如果新起一个jar包,CheckPoint是它执行时可直接从缓存(如缓存到了HDFS)中拿, 而缓存(缓存是只有一个jar包中可用,其他任务不可用)还要从头进行计算(它有依赖关系);
sc.setCheckpointDir("./checkPoint") checkpoint是会再启一个进程再计算一次,所以它会计算2次;
一般CheckPoint会和缓存结合使用,这样子CheckPoint就只是计算一次了;
def main(args: Array[String]): Unit = {
//初始化sc
val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR") sc.setCheckpointDir("./checkPoint")
val rdd: RDD[Int] = sc.makeRDD(List()).map(x => {
println("计算一次")
x
})
rdd.cache() //sCheckpoint和缓存结合使用,就只计算一次
// rdd.persist(StorageLevel.DISK_ONLY)
rdd.checkpoint()
rdd.collect()
rdd.collect() }
CheckPoint和缓存都可以缓存到磁盘上,根本区别是CheckPoint切断了依赖,缓存的方式不管是缓存到内存还是磁盘,任务执行完之后Driver和Executor就会释放,它会把你缓存到磁盘的那个目录文件都删除;而CheckPoint会一直存在磁盘上;
缓存和checkpoint的区别
cache,persist:不会切断血缘关系,可以指定存储级别
checkpoint:切断血缘关系,持久化存储
3. RDD编程
编程模型
在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。
3.1 RDD的创建
在Spark中创建RDD的创建方式可以分为三种:从集合中(内存)创建RDD;从外部存储(HDFS、本地磁盘、mysql等)创建RDD;从其他RDD创建(转换算子、action算子)。
1)从内存中创建:
/** Distribute a local Scala collection to form an RDD.
* This method is identical to `parallelize`.
*/
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { 可指定位置,发送到哪个分区的task,这种方法一般不用;
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
scala> val x = sc.makeRDD(List(,,,))
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at makeRDD at <console>: scala> x.collect
res0: Array[Int] = Array(, , , ) scala> val y = sc.parallelize(1 to 5).collect
y: Array[Int] = Array(, , , , ) scala> z.collect
res1: Array[String] = Array(Hello World, Hello java, Hello spark, "")
查看分区数
scala> x.getNumPartitions
res2: Int = scala> x.partitions.size
res4: Int =
3.2 默认分区规则:
① 从集合中创建默认分区规则:
在SparkContext中查找makeRDD
local模式分区数默认=核数;集群模式 math.max(totalCoreCount.get(),2)
numSlices: Int = defaultParallelism): RDD[T] = withScope
taskScheduler.defaultParallelism
def defaultParallelism(): Int ctrl+h 特质--看它的实现类
override def defaultParallelism(): Int = backend.defaultParallelism()
def defaultParallelism(): Int 特质
override def defaultParallelism(): Int =
scheduler.conf.getInt("spark.default.parallelism", totalCores) alt+<-返回;总核数totalCores 8个 def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope { ##defaultParallelism 8个
parallelize(seq, numSlices) #8个,如果从集合中创建RDD,Local模式的默认分区数是总核数
} CoarseGrainedSchedulerBackend yarn或standalone模式
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), )) #总核数与2取最大值
}
② 从文件系统中读默认分区规则:
scala> val z = sc.textFile("./wc.txt")
z: org.apache.spark.rdd.RDD[String] = ./wc.txt MapPartitionsRDD[] at textFile at <console>: scala> z.getNumPartitions
res6: Int =
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
} def defaultMinPartitions: Int = math.min(defaultParallelism, )
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}
def defaultParallelism(): Int -->查看它的特质实现类 override def defaultParallelism(): Int = backend.defaultParallelism()
def defaultParallelism(): Int -->查看它的特质实现类
override def defaultParallelism(): Int =
scheduler.conf.getInt("spark.default.parallelism", totalCores) ##总核数
返回:def defaultMinPartitions: Int = math.min(defaultParallelism, ) ##defaultParallelism为8
textFile中: minPartitions: Int = defaultMinPartitions): RDD[String] = withScope 这个值为2
hadoopFile中用到这个方法
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped() new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int) override def getPartitions: Array[Partition] = {
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)} ## getSplits-->InputFormat-->FileInputFormat找getSplits方法
long goalSize = totalSize / (long)(numSplits == ? : numSplits); totalSize总大小wc.txt总共36字节,numSplits要传的参数2
long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize); --> private long minSplitSize = 1L;
long blockSize = file.getBlockSize(); #块大小,HDFS上128M,windows是32M
long splitSize = this.computeSplitSize(goalSize, minSize, blockSize); (, , ) -->
return Math.max(minSize, Math.min(goalSize, blockSize));
文件切片机制按1.1倍判断,
(double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize
/ > 1.1 -->-=/ <.1不切, 最终得到2片切片
3.3 RDD的转换
RDD整体上分为Value类型和Key-Value类型
Value类型
1.map(func)
作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
map:
scala> val x = sc.makeRDD(1 to 4) #sc.parallelize(1 to 4)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at makeRDD at <console>: scala> x.map(x=>x.toString).collect
res9: Array[String] = Array(, , , ) scala> x.map(x=>(x,)).collect ##.map(_ * 2).collect()所有元素*2
res10: Array[(Int, Int)] = Array((,), (,), (,), (,))
scala> x.map((_,)).collect
res12: Array[(Int, Int)] = Array((,), (,), (,), (,))
2. mapPartitions(func)
mapPartitions:作用:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。
假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
/**
* Return a new RDD by applying a function to each partition of this RDD.
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
scala> val x = sc.makeRDD( to )
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at makeRDD at <console>: scala> x.mapPartitions(x=>x.map((_,1))).collect ## x.mapPartitions((x=>x.map(_*2))).collect
res13: Array[(Int, Int)] = Array((,), (,), (,), (,), (,), (,), (,), (,)) scala> x.map((_, 1)).collect
res14: Array[(Int, Int)] = Array((,), (,), (,), (,), (,), (,), (,), (,))
map()和mapPartition()的区别 (①传入函数执行次数不同;②效率不一样,map执行完一条内存就释放而mapPartition是一个分区的处理完内存也不释放)
1. map():每次处理一条数据。(处理完一条内存就释放了)
2. mapPartition():每次处理一个分区的数据(相当于批处理,一个分区数据批处理完它的内存不会释放),这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM。
3. 开发指导:当内存空间较大的时候建议使用mapPartition(),以提高处理效率。
3. mapPartitionsWithIndex(func)
作用:类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U];
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U], ##int分区号(从0开始的),分区数据---->转换为U类型
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
scala> val x = sc.makeRDD(List("kris", "alex", "heihei"))
x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[] at makeRDD at <console>: scala> x.mapPartitionsWithIndex((index,par)=>par.map((index,_))).collect
res15: Array[(Int, String)] = Array((,kris), (,alex), (,heihei))
4.flatMap(func) 可以一对一,也可以一对多
作用:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { ##函数返回的必须是可迭代的
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
scala> val x = sc.makeRDD( to )
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at makeRDD at <console>: scala> x.map(x=>x).collect
res16: Array[Int] = Array(, , , ) scala> x.flatMap(x=>x).collect
<console>:: error: type mismatch;
found : Int
required: TraversableOnce[?] ##需要可迭代的
x.flatMap(x=>x).collect
^
scala> x.flatMap(x=>x.toString).collect
res18: Array[Char] = Array(, , , ) scala> val x = sc.makeRDD(List(Array(,,,), Array(,,)))
x: org.apache.spark.rdd.RDD[Array[Int]] = ParallelCollectionRDD[] at makeRDD at <console>: scala> x.flatMap(x=>x).collect
res20: Array[Int] = Array(, , , , , , )
5.glom 实际开发中用的并不是很多
作用:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
scala> val x = sc.makeRDD( to )
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at makeRDD at <console>: scala> x.getNumPartitions
res21: Int = scala> x.glom.collect
res22: Array[Array[Int]] = Array(Array(), Array(), Array(), Array(, ), Array(), Array(), Array(), Array(, ))
6. groupBy(func)
作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy[K](f, defaultPartitioner(this))
}
scala> val x = sc.makeRDD( to )
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at makeRDD at <console>: scala> x.groupBy(_%2).collect
res23: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3, 5)))
scala> x.groupBy(x=>x>1).collect
res24: Array[(Boolean, Iterable[Int])] = Array((false,CompactBuffer()), (true,CompactBuffer(, , , )))
CompactBuffer
* An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable { }
指定在spark包下使用,似有的
7. filter(func)
作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
} scala> val x = sc.makeRDD(List("Hello Kris", "baidu", "jd"))
x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[] at makeRDD at <console>: scala> x.filter(x=>x.contains("Kris")).collect
res28: Array[String] = Array(Hello Kris)
scala> val x = sc.makeRDD( to )
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at makeRDD at <console>: scala> x.filter(x => x>1).collect ## x.filter(x=>x%2).collect 这样子写就不可以了,它的返回值不是Boolean类型
res29: Array[Int] = Array(, , )
8. sample(withReplacement, fraction, seed)
放回和不放回抽样
作用:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。 不放回的抽样,每个元素被抽中的概率[0,1]; 有放回的抽样,每个元素抽中的次数 >=0
源码如下
/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be greater
* than or equal to 0
* @param seed seed for the random number generator
*
* @note This is NOT guaranteed to provide exactly the fraction of the count
* of the given [[RDD]].
*/
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = { #它如果是固定的,产生的随机数也是固定的
require(fraction >= ,
s"Fraction must be nonnegative, but got ${fraction}") withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}
}
}
抽取的数的比例尽量靠近比例,不一定就是抽取的比例数
false 不放回抽样:
scala> val x = sc.makeRDD( to )
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at makeRDD at <console>:
scala> x.sample(false,,).collect
res33: Array[Int] = Array(, , , , , , , , , ) scala> x.sample(false,0,).collect
res34: Array[Int] = Array() scala> x.sample(false,0.5,).collect
res35: Array[Int] = Array(, , ) scala> x.sample(false,0.5,).collect
res36: Array[Int] = Array(, , , , , ) scala> x.sample(false,0.5,).collect ##seed随机数生成器一样,它返回的结果是一样的
res37: Array[Int] = Array(, , , , , ) scala> x.sample(false,0.5,).collect
res38: Array[Int] = Array(, , , , , ) scala> x.sample(false,0.5,).collect
res39: Array[Int] = Array(, , , ) scala> x.sample(false,0.5,).collect
res40: Array[Int] = Array()
不放回, 每个元素被抽中的概率 [,]
放回,每个元素抽中的次数>=0 true 放回的抽样:
scala> x.sample(true,,).collect
res41: Array[Int] = Array() scala> x.sample(true,,).collect
res42: Array[Int] = Array() scala> x.sample(true,,).collect
res43: Array[Int] = Array(, , , , , , , , ) scala> x.sample(true,,).collect
res44: Array[Int] = Array(, , , , , , , , , , , , , , , , ) scala> x.sample(true,,).collect
res45: Array[Int] = Array(, , , , , , , , , , , , , , , , , , , , , , , , , , ) scala> x.sample(false,0.5,System.currentTimeMillis).collect
res46: Array[Int] = Array(, , , , , ) scala> x.sample(false,0.5,System.currentTimeMillis).collect
res47: Array[Int] = Array(, , , , ) scala> x.sample(false,0.5,System.currentTimeMillis).collect
res48: Array[Int] = Array(, , , , )
9. distinct([numTasks]))
作用:对源RDD进行去重后返回一个新的RDD。
scala> val x = sc.makeRDD(List(,,,,,))
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at makeRDD at <console>: scala> x.distinct().collect
res49: Array[Int] = Array(, )
10. coalesce(numPartitions)
作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
def coalesce(numPartitions: Int, shuffle: Boolean = false, #shuffle是打乱重组
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
不产生shuffle想把它的分区数变多是不可以的,变少是可以的;
合并为一个分区不需要打乱重组,而拆分必须打乱重组
scala> val x = sc.makeRDD( to )
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at makeRDD at <console>: scala> x.getNumPartitions
res50: Int = scala> x.glom.collect
res51: Array[Array[Int]] = Array(Array(), Array(), Array(), Array(), Array(), Array(), Array(), Array()) scala> x.coalesce(, false).collect
res52: Array[Int] = Array(, , , ) scala> x.coalesce(, false).glom.collect
res53: Array[Array[Int]] = Array(Array(), Array(), Array(, )) scala> x.coalesce(, false).getNumPartitions
res54: Int = scala> x.coalesce(,false).getNumPartitions
res55: Int = scala> x.coalesce(,false).getNumPartitions
res56: Int = scala> x.coalesce(,true).getNumPartitions
res57: Int =
11.repartition(numPartitions)
改变分区数,肯定会产生shufle
作用:根据分区数,重新通过网络随机洗牌所有数据。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
scala> x.repartition().getNumPartitions
res58: Int =
coalesce与repartition两个算子的作用以及区别与联系。(①都可改变rdd分区数;②repartition是coalesce的一种特殊情况,肯定产生shuffle)
1. coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
2. repartition实际上是调用的coalesce,进行shuffle。
12. sortBy(func,[ascending], [numTasks])
作用;使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。
/**
* Return this RDD sorted by the given key function.
*/
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}
scala> val x = sc.makeRDD(List(5,4,3,2))
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[79] at makeRDD at <console>:24 scala> x.sortBy(x=>x).collect
res60: Array[Int] = Array(2, 3, 4, 5) scala> x.sortBy(x=>x%2).collect
res61: Array[Int] = Array(4, 2, 5, 3)
13. pipe(command, [envVars])
作用:管道,针对每个分区,都执行一个shell脚本,返回输出的RDD。
注意:脚本需要放在Worker节点可以访问到的位置
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String): RDD[String] = withScope {
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
pipe(PipedRDD.tokenize(command))
}
[kris@hadoop101 spark-local]$ vim pipe.sh
#!/bin/bash
echo "AA"
while read LINE; do
echo ">>>"${LINE}
done
## while是默认读取整行,for是默认以空格切割
scala> val x = sc.makeRDD(1 to 4, 1)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[90] at makeRDD at <console>:24 scala> x.collect
res63: Array[Int] = Array(1, 2, 3, 4) scala> x.pipe("./pipe.sh").collect
res65: Array[String] = Array(AA, >>>1, >>>2, >>>3, >>>4) scala> val x = sc.makeRDD(1 to 4, 3)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[92] at makeRDD at <console>:24 scala> x.pipe("./pipe.sh").collect
res66: Array[String] = Array(AA, >>>1, AA, >>>2, AA, >>>3, >>>4) ##在每个分区前都会打印AA
scala> x.pipe("./pipe.sh").getNumPartitions
res45: Int = 8
scala> x.pipe("./pipe.sh").collect
res46: Array[String] = Array(AA, AA, >>>1, AA, AA, >>>2, AA, AA, >>>3, AA, AA, >>>4)
双Value类型交互
1 union(otherDataset) 并集
作用:对源RDD和参数RDD求并集后返回一个新的RDD
union没有shuffle
/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
*/
def union(other: RDD[T]): RDD[T] = withScope {
sc.union(this, other)
}
scala> val x = sc.makeRDD(1 to 4)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[94] at makeRDD at <console>:24 scala> val y = sc.makeRDD(3 to 8)
y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[95] at makeRDD at <console>:24 scala> x.union(y).collect
res67: Array[Int] = Array(1, 2, 3, 4, 3, 4, 5, 6, 7, 8) sql中的union会去重,union all是直接把结果拿过来做个并集;
2. subtract (otherDataset) 差集
作用:计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来
x (1 to 4),y(3 to 8)
scala> x.subtract(y).collect
res68: Array[Int] = Array(1, 2)
scala> y.subtract(x).collect
res69: Array[Int] = Array(8, 5, 6, 7)
3. intersection(otherDataset) 交集
作用:对源RDD和参数RDD求交集后返回一个新的RDD
scala> x.intersection(y).collect
res70: Array[Int] = Array(3, 4) scala> y.intersection(x).collect
res71: Array[Int] = Array(3, 4)
4.cartesian(otherDataset) 笛卡尔积
作用:笛卡尔积(尽量避免使用)
scala> x.cartesian(y).collect
res72: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (1,6), (1,7), (1,8), (2,3), (2,4), (2,5), (2,6), (2,7), (2,8), (3,3), (3,4), (3,5), (3,6), (3,7), (3,8), (4,3), (4,4), (4,5), (4,6), (4,7), (4,8))
5. zip(otherDataset)
作用:将两个RDD组合成Key/ Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都要相同(需要注意的点),否则会抛出异常。
scala> y.collect
res75: Array[Int] = Array(3, 4, 5, 6) scala> x.collect
res76: Array[Int] = Array(1, 2, 3, 4) scala> x.getNumPartitions
res77: Int = 8 scala> y.getNumPartitions
res78: Int = 8 scala> x.zip(y).collect
res79: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,6)) scala> y.zip(x).collect
res80: Array[(Int, Int)] = Array((3,1), (4,2), (5,3), (6,4)) 如果分区数不一样java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(8, 3)
如果元素数量不一样 Can only zip RDDs with same number of elements in each partition
Key-Value类型 键值对RDD,只有kv键值对RDD才有分区器这个概念
1 partitionBy案例
并不是一创建就会有默认分区器HashPartitioner,这里默认是多部分算子都是用的是Hash分区器
scala> val x = sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,))
x: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[] at map at <console>: scala> x.partitioner
res91: Option[org.apache.spark.Partitioner] = None scala> val x = sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,)).reduceByKey(_+_) ##这样才会产生默认分区器
x: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[] at reduceByKey at <console>: scala> x.partitioner
res92: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@)
分区器有HashPartitioner、RangePartitioner
abstract class Partitioner extends Serializable
class HashPartitioner(partitions: Int) extends Partitioner
作用:对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。
源码:
/**
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
必须是key,value类型的才有分区器
scala> x.partitioner
res92: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@) scala>
scala> x.getNumPartitions
res93: Int =
scala> val y = x.partitionBy(new org.apache.spark.HashPartitioner(4)) ##如果要改变RDD的分区数或分区器,都可以直接调用partitionBy; 重写分区;
y: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[] at partitionBy at <console>: scala> y.getNumPartitions
res96: Int =
如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。
我们传进去的分区器是自己的分区器就不会产生shuffle
scala> y.partitionBy(y.partitioner)
<console>:: error: type mismatch;
found : Option[org.apache.spark.Partitioner]
required: org.apache.spark.Partitioner
y.partitionBy(y.partitioner)
^ scala> y.partitionBy(y.partitioner.get)
res98: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[] at partitionBy at <console>: scala> res98.partitioner
res99: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@) scala> res98.getNumPartitions
res100: Int = scala> y.partitionBy(y.partitioner.get)
res101: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[] at partitionBy at <console>: scala> y.partitionBy(y.partitioner.get).collect
res102: Array[(String, Int)] = Array(("",), (spark,), (Hello,), (World,), (java,)) scala> y.partitionBy(y.partitioner.get).count
res103: Long =
scala> val x = sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,)).reduceByKey
reduceByKey reduceByKeyLocally scala> val x = sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,)).reduceByKey(_+_)
x: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[] at reduceByKey at <console>: scala> x.getNumPartitions
res105: Int = scala> x.partitionBy(new org.apache.spark.HashPartitioner()).collect
res106: Array[(String, Int)] = Array((Hello,), ("",), (World,), (java,), (spark,))
只有传进去的是当前对象的partition才不会产生shuffle; scala> y.partitionBy(new org.apache.spark.HashPartitioner(4)).count res104: Long = 5 链太长了它会走缓存的
2. reduceByKey(func, [numTasks])
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
scala> val rdd = sc.parallelize(List(("female", ), ("male", ), ("female", ), ("male", )))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[] at parallelize at <console>: scala> rdd.reduceByKey
reduceByKey reduceByKeyLocally scala> rdd.reduceByKey(_+_).collect
res108: Array[(String, Int)] = Array((female,), (male,)) scala> rdd.reduceByKey(_*_).collect
res110: Array[(String, Int)] = Array((female,), (male,)) scala> rdd.reduceByKey(_ max _).collect #先在分区内进行计算,最终分区之间也要做计算;
res111: Array[(String, Int)] = Array((female,), (male,))
3. groupByKey
作用:groupByKey也是对每个key进行操作,但只生成一个seq
scala> val rdd = sc.parallelize(List(("female", ), ("male", ), ("female", ), ("male", ), ("male", )))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[] at parallelize at <console>: scala> rdd.groupByKey().collect
res114: Array[(String, Iterable[Int])] = Array((female,CompactBuffer(, )), (male,CompactBuffer(, , )))
groupByKey与reduceByKey的区别 (reduceByKey有预聚合功能,效率比较高)
1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。先在分区内部进行聚合,
2. groupByKey:按照key进行分组,直接进行shuffle。 有多少条数据直接进行shuffle,打乱重组直接发到下游
3. 开发指导:reduceByKey比groupByKey性能高,建议使用。但是需要注意是否会影响业务逻辑。
4. aggregateByKey
参数:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)
1. 作用:在kv对的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
2. 参数描述:
(1)zeroValue:给每一个分区中的每一种key一个初始值;
(2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;##values与初始化迭代聚合
(3)combOp:函数用于合并每个分区中的结果。 #分区之间的聚合
3. 需求:创建一个pairRDD,取出每个分区相同key对应值的最大值,然后相加
源码:
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) // We will clean the combiner closure later in `combineByKey`
val cleanedSeqOp = self.context.clean(seqOp)
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp, combOp, partitioner)
}
scala> val rdd = sc.parallelize(List(("a",3), ("a",2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.getNumPartitions
res27: Int = 2 scala> rdd.collect
res5: Array[(String, Int)] = Array((a,3), (a,2), (c,4), (b,3), (c,6), (c,8))
scala> rdd.glom.collect
res28: Array[Array[(String, Int)]] = Array(Array((a,3), (a,2), (c,4)), Array((b,3), (c,6), (c,8))) scala> rdd.aggregateByKey(0)(_ max _, _+_).collect
res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12)) 求平均值:
scala> rdd.aggregateByKey((0,0))((init,v)=>(init._1+v,init._2+1),(x,y)=>(x._1+y._1,x._2+y._2)).collect
res2: Array[(String, (Int, Int))] = Array((b,(3,1)), (a,(5,2)), (c,(18,3)))
求出每个相同key的和,相同key的个数; 和,次数
scala> val result = rdd.aggregateByKey((0,0))((init,v)=>(init._1+v,init._2+1),(x,y)=>(x._1+y._1,x._2+y._2))
//(0,0)代表和,次数; init是k的初始值,v是同一种k的v,和相加,次数也相加;init是元组类型,访问它的k即_1 和它的个数v
//每一个分区的每一种key--->初始值元组(0,0)
//每个分区之间做运行也需要传两个参数,(x,y)是每个分区之间做处理,每个x是代表元组类型,每个分区的每种key会得到一个元组;x,y都是k v对 result: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[3] at aggregateByKey at <console>:26
scala> result.collect
res31: Array[(String, (Int, Int))] = Array((b,(3,1)), (a,(5,2)), (c,(18,3))) scala> result.mapValues(x=>x._1/x._2).collect
res3: Array[(String, Int)] = Array((b,3), (a,2), (c,6)) scala> result.mapValues(x=>x._1.toDouble/x._2).collect
res4: Array[(String, Double)] = Array((b,3.0), (a,2.5), (c,6.0))
6 foldByKey案例
参数:(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
- 作用:aggregateByKey的简化操作,seqop和combop相同
scala> val rdd = sc.parallelize(List((,),(,),(,),(,),(,),(,)),)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[] at parallelize at <console>:
scala> rdd.collect
res39: Array[(Int, Int)] = Array((,), (,), (,), (,), (,), (,))
scala> rdd.glom.collect
res41: Array[Array[(Int, Int)]] = Array(Array((,), (,)), Array((,), (,)), Array((,), (,)))
计算相同key对应值的相加结果
scala> rdd.foldByKey()(_+_).collect
res42: Array[(Int, Int)] = Array((,), (,), (,))
求平均值:
scala> rdd.map(x => (x._1, (x._2, ))).foldByKey((, )) ((x, y) => (x._1+y._1, x._2+y._2)).mapValues(x => x._1.toDouble/x._2).collect
res44: Array[(Int, Double)] = Array((,7.0), (,3.0), (,3.0))
7 combineByKey[C] 案例
参数:(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
- 作用:针对相同K,将V合并成一个集合。
- 参数描述:
(1)createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值
(2)mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并
(3)mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。
- 需求:创建一个pairRDD,根据key计算每种key的均值。(先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)
scala> val input = sc.parallelize(Array(("a", ), ("b", ), ("a", ), ("b", ), ("a", ), ("b", )),)
input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[] at parallelize at <console>:
scala> val result = input.combineByKey((_,), (acc:(Int, Int), v) => (acc._1 + v, acc._2 + ), (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)).collect
result: Array[(String, (Int, Int))] = Array((b,(,)), (a,(,)))
scala> val result = input.combineByKey((_,), (acc:(Int, Int), v) => (acc._1 + v, acc._2 + ), (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)).mapValues(x => x._1.toDouble / x._2).collect
result: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333))
8 sortByKey([ascending], [numTasks]) 案例
1. 作用:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
2. 需求:创建一个pairRDD,按照key的正序和倒序进行排序
scala> val rdd = sc.parallelize(Array((,"aa"),(,"cc"),(,"bb"),(,"dd")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[] at parallelize at <console>:
scala> rdd.collect
res58: Array[(Int, String)] = Array((,aa), (,cc), (,bb), (,dd))
scala> rdd.sortByKey().collect
res57: Array[(Int, String)] = Array((,dd), (,bb), (,aa), (,cc))
scala> rdd.sortByKey(false).collect
res59: Array[(Int, String)] = Array((,cc), (,aa), (,bb), (,dd))
9 mapValues案例
1. 针对于(K,V)形式的类型只对V进行操作
2. 需求:创建一个pairRDD,并将value添加字符串"|||"
scala> val rdd = sc.parallelize(Array((,"a"),(,"d"),(,"b"),(,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[] at parallelize at <console>: scala> rdd.mapValues(_+"|").collect
res60: Array[(Int, String)] = Array((,a|), (,d|), (,b|), (,c|))
10 join(otherDataset, [numTasks]) 案例
1. 作用:在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
2. 需求:创建两个pairRDD,并将key相同的数据聚合到一个元组。
scala> val rdd = sc.parallelize(Array((,"a"),(,"b"),(,"c"),(,"d")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[] at parallelize at <console>: scala> val rdd1 = sc.parallelize(Array((,),(,),(,),(,)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[] at parallelize at <console>: scala> rdd.collect
res68: Array[(Int, String)] = Array((,a), (,b), (,c), (,d)) scala> rdd1.collect
res69: Array[(Int, Int)] = Array((,), (,), (,), (,)) scala> rdd.leftOuterJoin(rdd1).collect
res70: Array[(Int, (String, Option[Int]))] = Array((,(a,Some())), (,(b,Some())), (,(c,Some())), (,(d,None))) scala> rdd.join(rdd1).collect
res71: Array[(Int, (String, Int))] = Array((,(a,)), (,(b,)), (,(c,))) scala> rdd.rightOuterJoin(rdd1).collect
res72: Array[(Int, (Option[String], Int))] = Array((,(Some(a),)), (,(Some(b),)), (,(Some(c),)), (,(None,)))
11 cogroup(otherDataset, [numTasks]) 案例
1. 作用:在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
2. 需求:创建两个pairRDD,并将key相同的数据聚合到一个迭代器。
scala> val rdd = sc.parallelize(Array((,"a"),(,"b"),(,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[] at parallelize at <console>:
scala> val rdd1 = sc.parallelize(Array((,), (,), (,)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[] at parallelize at <console>:
scala> rdd.collect
res74: Array[(Int, String)] = Array((,a), (,b), (,c)) scala> rdd1.collect
res75: Array[(Int, Int)] = Array((,), (,), (,))
scala> rdd.cogroup(rdd1).collect
res73: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((,(CompactBuffer(a),CompactBuffer())), (,(CompactBuffer(b),CompactBuffer())), (,(CompactBuffer(c),CompactBuffer())))
求平均值
① def aggregateByKey[U: ClassTag]
(zeroValue: U) 给每一个分区中的每一种key一个初始值;
(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope seqOp函数用于在每一个分区中用初始值逐步迭代value; combOp:函数用于合并每个分区中的结果
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),cleanedSeqOp, combOp, partitioner)
② def foldByKey 它是aggregateByKey的简化操作,seqop和combop相同
(zeroValue: V) 初始值要跟rdd 的 V一致;
(func: (V, V) => V): RDD[(K, V)] = self.withScope
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner)
③ def combineByKey[C](
createCombiner: V => C, //给每个分区内部的每一种key一个初始函数
mergeValue: (C, V) => C, //合并每个分区内部同种key的值,返回类型跟初始函数返回类型相同
mergeCombiners: (C, C) => C, //分区之间,相同的key的值进行聚合
partitioner: Partitioner, //分区器
mapSideCombine: Boolean = true, //是否进行预聚合
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine, serializer)(null)
} ④ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) aggregateByKey,reduceByKey, foldByKey
def combineByKeyWithClassTag[C](
createCombiner: V => C, 给每个分区内部每一种k一个初始函数,得到一个初始值
mergeValue: (C, V) => C, 分区内部同种key不同值进行合并,第一个参数,是是一个函数的返回值
mergeCombiners: (C, C) => C, 分区之间,同种k的value合并
partitioner: Partitioner, 分区器
mapSideCombine: Boolean = true, 是否进行预聚合
serializer: Serializer = null)
scala> val rdd = sc.parallelize(List((,),(,),(,),(,),(,),(,)),) scala> rdd.glom.collect
res16: Array[Array[(Int, Int)]] = Array(Array((,), (,)), Array((,), (,)), Array((,), (,)))
①aggregateByKey
scala> rdd.aggregateByKey((,)) ((init,v) => (init._1+v, init._2+), (x,y) => (x._1+y._1, x._2+y._2)).collect
res15: Array[(Int, (Int, Int))] = Array((,(,)), (,(,)), (,(,)))
②foldByKey 初始值必须要跟rdd的v一样,
scala> rdd.map(x => (x._1, (x._2, 1))).foldByKey((,)) ((x,y) => (x._1+y._1, x._2+y._2)).collect
res30: Array[(Int, (Int, Int))] = Array((,(,)), (,(,)), (,(,)))
③combineByKey
scala> rdd.combineByKey(x => (x,), (acc:(Int,Int),newValue) => (acc._1+newValue, acc._2+),(x:(Int, Int), y:(Int, Int))=>(x._1+y._1, x._2+y._2)).collect
res29: Array[(Int, (Int, Int))] = Array((,(,)), (,(,)), (,(,)))
④reduceByKey
scala> rdd.map(x=>(x._1, (x._2, ))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).collect
res35: Array[(Int, (Int, Int))] = Array((,(,)), (,(,)), (,(,)))
scala> rdd.map(x=>(x._1, (x._2, ))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x => x._1.toDouble/x._2).collect
res6: Array[(Int, Double)] = Array((,7.0), (,3.0), (,3.0))
Action
action算子
行动算子都调了sc.runjob
1 reduce(func)案例
def reduce(f: (T, T) => T): T
1. 作用:通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
2. 需求:创建一个RDD,将所有元素聚合得到结果
scala> val rdd = sc.makeRDD( to , )
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at makeRDD at <console>:
scala> rdd.collect
res78: Array[Int] = Array(, , , , , , , , , ) scala> rdd.reduce(_+_)
res77: Int =
scala> rdd.map((_,)).collect
res79: Array[(Int, Int)] = Array((,), (,), (,), (,), (,), (,), (,), (,), (,), (,)) scala> rdd.map((_,)).reduce((x,y) => (x._1+y._1, x._2+y._2))
res82: (Int, Int) = (,)
scala> rdd.reduce(_ max _)
res84: Int =
scala> val rdd2 = sc.makeRDD(Array(("a",),("a",),("c",),("d",)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[] at makeRDD at <console>:
scala> rdd2.reduce((x,y) => (x._1 + y._1, x._2+y._2))
res86: (String, Int) = (caad,)
2 collect()案例 collect不要随意用,少用
1. 作用:在驱动程序中,以数组的形式返回数据集的所有元素。
2. 需求:创建一个RDD,并将RDD内容收集到Driver端打印
scala> val rdd = sc.parallelize( to )
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at parallelize at <console>: scala> rdd.collect
res87: Array[Int] = Array(, , , , , , , , , )
3 count()案例
1. 作用:返回RDD中元素的个数
2. 需求:创建一个RDD,统计该RDD的条数
scala> rdd.count
res88: Long =
4 first()案例 --底层调用了take
1. 作用:返回RDD中的第一个元素
2. 需求:创建一个RDD,返回该RDD中的第一个元素
scala> rdd.first
res89: Int =
5 take(n)案例 take 和collect一样都是把数据弄到driver内存里,慎用
1. 作用:返回一个由RDD的前n个元素组成的数组
2. 需求:创建一个RDD,统计该RDD的条数
scala> val rdd = sc.parallelize(Array(,,,,,))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at parallelize at <console>: scala> rdd.take()
res91: Array[Int] = Array(, , )
6 takeOrdered(n)案例
1. 作用:返回该RDD排序后的前n个元素组成的数组
2. 需求:创建一个RDD,统计该RDD的条数
scala> rdd.takeOrdered()
res92: Array[Int] = Array(, , ) scala> rdd.takeOrdered()
res93: Array[Int] = Array(, , , , , )
7 aggregate案例
1. 参数:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
2. 作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
3. 需求:创建一个RDD,将所有元素相加得到结果
aggregate()分区内部元素一个初始值()分区内部元素初始值进行合并()分区之间,同种k的value合并
aggregate(0)(_+_, _+_)求和
scala> var rdd1 = sc.makeRDD( to ,)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at makeRDD at <console>:
scala> rdd1.glom.collect
res99: Array[Array[Int]] = Array(Array(, , , , ), Array(, , , , ))
scala> rdd1.aggregate()(_ max _, _+_)
res97: Int =
scala> rdd1.aggregate()(_+_, _+_)
res101: Int =
aggregateByKey和aggregate的区别和联系:①参数个数一样; ②分区内部和分区之间聚合对象不一样;
aggregateByKey:对同种key的值; aggregate是对rdd的元素
8 fold(num)(func)案例
1. 作用:折叠操作,aggregate的简化操作,seqop和combop一样。
2. 需求:创建一个RDD,将所有元素相加得到结果
scala> rdd1.fold()(_+_)
res102: Int = scala> rdd1.fold()(_ max _)
res103: Int =
scala中x.reduce(_+_)
9 saveAsTextFile(path)
作用:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
scala> rdd.saveAsTextFile("./textFile.txt")
10 saveAsSequenceFile(path)
作用:将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
11 saveAsObjectFile(path)
作用:用于将RDD中的元素序列化成对象,存储到文件中。
12 countByKey()案例
1. 作用:针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
2. 需求:创建一个PairRDD,统计每种key的个数
scala> val rdd = sc.parallelize(List((,),(,),(,),(,),(,),(,)),)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[] at parallelize at <console>:
scala> rdd.glom.collect
res107: Array[Array[(Int, Int)]] = Array(Array((,), (,)), Array((,), (,)), Array((,), (,)))
scala> rdd.countByKey
res106: scala.collection.Map[Int,Long] = Map( -> , -> , -> )
13 foreach(func)案例
1. 作用:在数据集的每一个元素上,运行函数func进行更新。
2. 需求:创建一个RDD,对每个元素进行打印
scala> var rdd = sc.makeRDD( to ,)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at makeRDD at <console>:
scala> rdd.foreach(println(_))
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope
可以控制它的链接数,(可写数据库连接池等);每个记录创建一次;(跟map和mapPartition类似)
rdd.foreachPartition(x => {
x.foreach(println(_))
})