[1.2]Spark core编程(一)之RDD总论与创建RDD的三种方式

时间:2021-09-10 21:15:53

参考

DT大数据梦工厂
Spark官网

场景

  • RDD的理解
    一、RDD是基于工作集的应用抽象;是分布式、函数式编程的抽象。
    MapReduce:基于数据集的处理。两者的共同特征:位置感知(具体数据在哪里)、容错、负载均衡。
    基于数据集的处理:从物理存储设备上加载数据,然后操作数据,写入物理存储设备。eg、Hadoop MapReduce
    不适应场景:
    1、不适合于大量的迭代
    2、不适合于交付式查询
    3、基于数据流的方式,不能够复用曾经的结果或者中间计算结果。
    二、RDD的”弹性”(Resilient)
    1、自动的进行内存与磁盘数据存储的切换
    2、基于Lineage的高效容错
    3、Task如果失败会自动进行特定次数的重试
    4、Stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片
    5、checkpoint和persist:对于长链接的操作,把中间的数据放到磁盘上去
    6、数据分片的高度弹性(提高、降低并行度)
    7、数据调度弹性:DAG TASK和资源管理无关

  • 创建RDD的几种方式
    1、基于程序中的集合创建RDD-作用:测试
    2、基于本地文件创建RDD-作用:大数据量的测试
    3、基于HDFS创建RDD-作用:生产环境最常用的RDD创建方式
    4、基于DB、NoSQL(例如HBase)、S3、基于数据流创建RDD

实验

package main.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* 创建RDD的三种方式初体验
*/

object RDDBaseOnCollection {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDDBaseOnCollection")
val sc = new SparkContext(conf)

/*
* 1、从scala集合中创建RDD
* 计算:1+2+3+...+100
*/

val nums = 1 to 100
val rdd = sc.parallelize(nums)
val sum = rdd.reduce(_+_)
println("sum:"+sum)

/*
* 2、从本地文件系统创建RDD
* 计算 people.json 文件中字符总长度
*/

val rows = sc.textFile("file:///home/hadoop/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
val length = rows.map(row=>row.length()).reduce(_+_)
println("total chars length:"+length)

/*
* 3、从HDFS创建RDD(lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at textFile at)
* 计算 hive_test 文件中字符长度
*/

val lines = sc.textFile("hdfs://112.74.21.122:9000/user/hive/warehouse/hive_test")
println( lines.map(row=>row.length()).reduce(_+_))
}
}

总结

一、“ RDD(resilient distributed dataset) is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.”
二、RDD是基于工作集的应用抽象;是分布式、函数式编程的抽象。