Spark 学习条记

时间:2021-11-15 04:18:31

1)通过 RDD 的调集数据布局,创建 RDD

sc.parallelize(List(1,2,3),2) 此中第二个参数代表的是整个数据,分为 2 个 partition,默认情况会讲数据集进行等分,注意不是两个副本

2)通过文件来读取

sc.textFile("file.txt")

sc.sequenceFile("file.txt") sequeceFile 是 HDFS 一些数据布局

文件读取的位置,容易孕育产生奇异,好比一下几种形式:

1)、inputRdd = sc.textFile("/data/input")

2)、inputRdd = sc.textFile("file:///data/input")

3)、inputRdd = sc.textFile("hdfs:///data/input")

4)、inputRdd = sc.textFile("hdfs://namenode:8020/data/input")

第一种 /data/input 具体读取的是本地和 hdfs上的文件,要依赖于上下文环境,driver 的配置,driver 是 local 的模式就读的本地文件,driver 是 cluster 模式的且在conf里面配置了 hdfs 的 namenode 地点的,则是读取的长途的文件

第二种 file:///data/input 是强制 executor 读取本地的数据,这样完全是为了本地测试用的,如果是在集群上运行功课,executor 具体运行的物理机器的相应目录未必存在

二、List 调集 RDD 常见的 Transformation 操纵

1、map:1 对 1 进行映射

2、filter:过滤

3、flatMap:1 对 多进行映射

举个例子

listRdd = sc.parallelize(List(1,2,3),1)

nums.flatMap(x=>1 to x) // {1,2,3,2,3,3}

三、key-value 调集的RDD操纵

val listRdd = sc.parallelize(List((“cat”,1),("dog",1),("cat",2)))

listRdd.reduceByKey(_+_) // => {(cat,3),(dog,1)}

listRdd.groupByKey() // => {(cat,Seq(1,2),(dog,Seq(1))}

reduceByKey 自动在map端进行本地的 combine 操纵

四、RDD 常见的 Action 操纵

Action 操纵,分为,内存聚类操纵,存储类操纵

内存堆积类操纵是讲漫衍式的数据集 汇聚到 driver 运行端,或者汇聚完之后进行聚合运算

1、collect() // 将 RDD 生存在本地调集收集到本地, 此“本地” 是只 driver 运行的机器,如何 RDD 很大,,很可能会把 driver 端给撑爆了

2、take()

3、count()

4、reduce(_+_)

存储类操纵是通过 driver 倡议分袂进行存储

1、saveAsTextFile

2、saveAsSequenceFile

五、Spark RDD的 Join 操纵

Join 操纵必需是 针对 2个或多个 key-value 的 List 调集

join 和 cogroup 的区别

如何控制 reduceByKey、groupByKey、join 的并行度

通过参数来改削

1、reduceByKey(_+_,5)

2、groupByKey(5)

通过改削默认的参数来配置

spark.default.parallelism

可以这样来理解问 reduce 的数量的控制,道理我猜是通过 hash 讲差此外key进行分桶

hadoop 的 reduce 默认是启动一个 task,spark 默认的 reduce 真个聚合操纵默认和前一个阶段的并发度是一样的

六、spark 的 accumulator 和 广播变量(HttpBroadCast和TorrentBroadcast)

非常类似于 hadoop 里面的 counter 和 漫衍式缓存,只是漫衍式缓存是通过文件的方法

七、RDD 的 Cache

分析下以下2段代码的区别:

// 有 cache 函数

val data = sc.textFile("hdfs://nn:8020/input")
data.cache()
data.filter(_.startWith("error")).count()
data.filter(_.startWith("hadoop")).count()
data.filter(_.startWith("hbase")).count()

// 无 cache 函数
val data = sc.textFile("hdfs://nn:8020/input")
data.filter(_.startWith("error")).count()
data.filter(_.startWith("hadoop")).count()
data.filter(_.startWith("hbase")).count()