Spark中三大数据结构:RDD; 广播变量: 分布式只读共享变量; 累加器:分布式只写共享变量; 线程和进程之间
1.RDD中的函数传递
自己定义一些RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。
传递一个方法
class Search(query: String){ // extends Serializable
//过滤出包含字符串的数据
def isMatch(s: String): Boolean = {
s.contains(query)
}
//过滤出包含字符串的RDD
def getMatch1(rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
//过滤出包含字符串的RDD
def getMatch2(rdd: RDD[String]): RDD[String] = {
val str: String = this.query //将类变量赋值给局部变量str,即可序列化;
rdd.filter(x => x.contains(str)) }
}
object TestSearch {
def main(args: Array[String]): Unit = {
//初始化配置信息以及 sc
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf) val rdd = sc.makeRDD(List("kris", "Baidu", "Google")) //创建一个RDD
val search = new Search("ris") //创建一个search对象
println("===============")
//运用第一个过滤函数并打印结果;
val res: RDD[String] = search.getMatch1(rdd) //java.io.NotSerializableException: com.atguigu.spark.Search
//class Search(query: String) extends Serializable
res.foreach(println(_))
}
}
//过滤出包含字符串的RDD
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
在这个方法中所调用的方法isMatch()是定义在Search这个类中的,实际上调用的是this. isMatch(),this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。
解决方案 使类继承scala.Serializable即可。 class Search() extends Serializable{...}
传递一个属性
//初始化sc
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf) val rdd = sc.makeRDD(List("kris", "Baidu", "Google"))
val search = new Search("ris")
println("===============") val res2: RDD[String] = search.getMatch2(rdd)
res2.foreach(println(_))
rdd.filter(x => x.contains(query))
在这个方法中所调用的方法query是定义在Search这个类中的字段,实际上调用的是this. query,this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。
解决方案:将类变量query赋值给局部变量如上所示;
2. RDD依赖关系
Lineage 血统
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错时的高效性。
Narrow Dependencies是指父RDD的每一个分区最多被一个子RDD的分区所用,
表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。
Wide Dependencies是指子RDD的分区依赖于父RDD的多个分区或所有分区,
也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。
scala> val x = sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,))
x: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[] at map at <console>:
scala> x.toDebugString
res112: String =
() MapPartitionsRDD[] at map at <console>: [] ## new MapPartitionsRDD(
| MapPartitionsRDD[] at flatMap at <console>: [] ##new MapPartitionsRDD
| ./wc.txt MapPartitionsRDD[] at textFile at <console>: []
| ./wc.txt HadoopRDD[] at textFile at <console>: [] scala> x.dependencies ##可以看到它的依赖OneToOneDependency窄依赖
res113: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@1af408a8)
scala> val y = x.reduceByKey(_+_)
y: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[] at reduceByKey at <console>: scala> y.toDebugString
res114: String =
() ShuffledRDD[] at reduceByKey at <console>: []
+-() MapPartitionsRDD[] at map at <console>: []
| MapPartitionsRDD[] at flatMap at <console>: []
| ./wc.txt MapPartitionsRDD[] at textFile at <console>: []
| ./wc.txt HadoopRDD[] at textFile at <console>: [] scala> y.dependencies
res115: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@5b7e9456) ##宽依赖,产生shuffle
跨节点传输数据就产生shuffle
DAG
DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。
3. 任务划分(重点)
RDD任务切分中间分为:Application、Job、Stage和Task
1)Application:初始化一个SparkContext即生成一个Application; 提交一个jar包就是Application(一个Application可以 有多个job)
2)Job:一个Action算子就会生成一个Job ;
3)Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。
4)Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task。
注意:Application->Job->Stage->Task每一层都是1对n的关系。
有多少个task,由你当前stage的最后一个RDD的分区数决定
窄依赖分两种:OneToOneDependency和 RangeDependency(如两个分区union两个分区 => 四个分区) NarrowDependency窄依赖
Union会产生窄依赖(查看源码);map也是窄依赖; ReduceByKey是宽依赖,shuffledRDD---shufleDependency
scala> sc.makeRDD( to ).map((_,)).reduceByKey(_+_).collect
res3: Array[(Int, Int)] = Array((,), (,), (,), (,), (,), (,), (,), (,))
在宽依赖算子reduceByKey那切一刀;
scala> sc.makeRDD( to ).map((_,)).reduceByKey(_+_).map((_,)).reduceByKey(_+_).collect
res4: Array[((Int, Int), Int)] = Array(((,),), ((,),), ((,),), ((,),), ((,),), ((,),), ((,),), ((,),))
两个reduceByKey宽依赖,分成了3个stage;
RDD分区数对应Task数;
scala> sc.makeRDD( to ,).map((_,)).coalesce(,false).reduceByKey(_+_).coalesce(,false).collect
res5: Array[(Int, Int)] = Array((,), (,), (,), (,), (,), (,), (,), (,))
4个分区==> map算子 4个分区 ==> 经过coalesce转换为3个分区 reduceByKey宽依赖算子切分了两个stage coalesce产生 2个分区
可以推断得到产生了1个Application,2个stage,第一个stage产生了3个task;第二个stage产生了2个task;
3个task
2个task
4. 键值对RDD数据分区
Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数
注意:(1)只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区的值是None
(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
获取RDD分区:可以通过使用RDD的partitioner 属性来获取 RDD 的分区方式。它会返回一个 scala.Option 对象, 通过get方法获取其中的值。
scala> val pairs = sc.parallelize(List((,),(,),(,)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[] at parallelize at <console>:
scala> pairs.partitioner 查看RDD的分区器
res5: Option[org.apache.spark.Partitioner] = None
scala> import org.apache.spark.HashPartitioner 导入HashPartitioner类
import org.apache.spark.HashPartitioner scala> val partitioned = pairs.partitionBy(new HashPartitioner()) //使用HashPartitioner对RDD进行重新分区
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[] at partitionBy at <console>: scala> partitioned.partitioner
res6: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@)
Hash分区
HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。
源码:
def getPartition(key: Any): Int = key match {
case null => 0 //key为null直接进0号分区;
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
rdd.partitionBy(new org.apache.spark.HashPartitioner(7) .partitioner查看分区器
Ranger分区
HashPartitioner分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据。
RangePartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。实现过程为:
第一步:先从整个RDD中抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;
第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的
自定义分区
要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。
(1)numPartitions: Int:返回创建出来的分区数。
(2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。
(3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。
class MyPartitioner(partitions: Int) extends Partitioner{ //传入参数,可指定分区
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = { //只能根据key进行分区;而hadoop的分区既可根据key也可根据value (Partitioner.java)
key.toString.toInt % partitions
//0 //也可以写0,不管传进来什么key,数据只进入0号分区;
}
}
测试
object TextPartition {
def main(args: Array[String]): Unit = {
//初始化sc
val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf) val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1,1), (2,1), (3,1), (4,1)))
val rdd2: RDD[(Int, Int)] = rdd.partitionBy(new MyPartitioner(2))
//val rdd: RDD[String] = sc.textFile("E:\\wc.txt")
rdd2.saveAsTextFile("E:\\output")
sc.stop()
}
}
结果是2个文件(2个分区,数据进入了2个分区)
5. 数据读取与保存
Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;
文件系统分为:本地文件系统、HDFS、HBASE以及数据库。
文件类数据读取与保存
Text文件
1)数据读取:textFile(String)
scala> val hdfsFile = sc.textFile("hdfs://hadoop101:9000/fruit.txt")
hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24
2)数据保存: saveAsTextFile(String)
scala> hdfsFile.saveAsTextFile("/fruitOut")
Json文件
如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。
注意:使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。
scala> import scala.util.parsing.json.JSON
import scala.util.parsing.json.JSON scala> sc.textFile("/opt/module/spark/spark-local/examples/src/main/resources/people.json").collect
res13: Array[String] = Array({"name":"Michael"}, {"name":"Andy", "age":}, {"name":"Justin", "age":})
scala> val y = sc.textFile("/opt/module/spark/spark-local/examples/src/main/resources/people.json")
y: org.apache.spark.rdd.RDD[String] = /opt/module/spark/spark-local/examples/src/main/resources/people.json MapPartitionsRDD[] at textFile at <console>:
scala> y.map(JSON.parseFull).collect
res19: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))
Sequence文件
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile[keyClass, valueClass](path)。
注意:SequenceFile文件只针对PairRDD
scala> val rdd = sc.parallelize(Array((,),(,),(,)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[] at parallelize at <console>:
scala> rdd.saveAsSequenceFile("./output/seque") //
scala> sc.sequenceFile[Int,Int]("/opt/module/spark/spark-local/output/seque").collect //必须加泛型,不然会报错 ambiguous implicit values:
res22: Array[(Int, Int)] = Array((,), (,), (,))
对象文件
对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path) 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。
scala> val rdd = sc.parallelize(Array(,,,))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[] at parallelize at <console>:
scala> rdd.saveAsObjectFile("./output/object")
scala> sc.objectFile[(Int)]("/opt/module/spark/spark-local/output/object").collect //也要加泛型
res25: Array[Int] = Array(, , , )
文件系统类数据读取与保存
①HDFS
Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持.另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口.对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口,主要包含以下四个参数.
1)输入格式(InputFormat): 制定数据输入的类型,如TextInputFormat等,新旧两个版本所引用的版本分别是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
2)键类型: 指定[K,V]键值对中K的类型
3)值类型: 指定[K,V]键值对中V的类型
4)分区值: 指定由外部存储生成的RDD的partition数量的最小值,如果没有指定,系统会使用默认值defaultMinSplits。
② MySQL
支持通过Java JDBC访问关系型数据库。需要通过JdbcRDD进行,
从Mysql读取数据
//初始化
val conf = new SparkConf().setAppName("WorldCount").setMaster("local[*]")
val sc = new SparkContext(conf) //定义连接mysql的参数
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop101:3306/rdd"
val userName = "root"
val password = "123456"
//读取 //创建JdbcRDD
val rdd: JdbcRDD[(Int, String)] = new JdbcRDD(sc, () => {
Class.forName(driver)
DriverManager.getConnection(url, userName, password)},
"select * from test where id >= ? and id <= ?;",
1, 4, 2,
r => (r.getInt(1), r.getString(2))
)
println(rdd.count())
rdd.foreach(println(_)) sc.stop()
从rdd写入mysql
//rdd数据输出到mysql
//写入数据,foreachPartition是每个分区创建一个连接
val rdd: RDD[(Int, String)] = sc.makeRDD(List((5, "Amazon")))
rdd.foreachPartition(x => {
Class.forName(driver)
val conn: Connection = DriverManager.getConnection(url, userName, password)
x.foreach(x => {
val id: Int = x._1
val name: String = x._2
val statement: PreparedStatement = conn.prepareStatement("insert into test (id, name) values(?, ?)")
statement.setInt(1, id)
statement.setString(2, name)
statement.execute()
})
} )
sc.stop()
③ Hbase
从HBase读取数据
由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过Hadoop输入格式访问HBase。这个输入格式会返回键值对数据,其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为org.apache.hadoop.hbase.client.
Result。
//初始化sc
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf) //从hbase表读取数据
val configuration: Configuration = HBaseConfiguration.create()
configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103")
configuration.set(TableInputFormat.INPUT_TABLE, "fruit") val rdd = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
rdd.foreach(x => {
val cells: Array[Cell] = x._2.rawCells()
cells.foreach(cell => {
val rowkey: String = Bytes.toString(CellUtil.cloneRow(cell))
val family: String = Bytes.toString(CellUtil.cloneFamily(cell))
val column: String = Bytes.toString(CellUtil.cloneQualifier(cell))
val value: String = Bytes.toString(CellUtil.cloneValue(cell))
println(s"$rowkey $family $column $value")
})
})
sc.stop()
往HBase写入
//初始化sc
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//rdd数据写入到hbase表
val rdd: RDD[(String, String, String, String)] = sc.makeRDD(List(("1004", "info", "name", "pineapple")))
val rdd2 = rdd.map(x => {
val put: Put = new Put(Bytes.toBytes(x._1))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(x._4))
(new ImmutableBytesWritable(), put)
}) //创建配置
val configuration: Configuration = HBaseConfiguration.create()
configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103")
configuration.set(TableOutputFormat.OUTPUT_TABLE, "fruit")
//设置OutputFormat类型
val job: Job = Job.getInstance(configuration)
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result]) rdd2.saveAsNewAPIHadoopDataset(job.getConfiguration) sc.stop()
6. 累加器
代码程序都在Driver,序列化传到Executor节点上去执行;
val rdd = sc.makeRDD(1 to 4) //创建RDD
var a = 0
rdd.foreach(x => {
a += 1
})
println(a) //打印的a是driver端的,而不是executor端的;
执行,输出的却是0; 代码在Driver端,具体执行是在Executor,executor中会有副本a = 0,每个节点的executor都各自有各自的副本,在自己节点上修改
累加器用来对信息进行聚合,通常在向 Spark传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。
//初始化sc
val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf) val rdd = sc.makeRDD(1 to 4) //创建RDD
val acc: LongAccumulator = sc.longAccumulator //它有一个初始值
/* 源码: class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
private var _sum = 0L
private var _count = 0L*/
println("初始值: "+ acc.value) //
rdd.foreach(x => {
acc.add(1)
})
println(acc.value) //
自定义累加器
自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。实现自定义类型累加器需要继承AccumulatorV2并覆写要求的方法。
copy每个节点都要copy Driver端的;每个节点再对它进行重置reset;add在自己各自节点操作;merge是其他Executor节点中的和Driver端的进行合并;
//自定义一个类:
class MyAcc1 extends AccumulatorV2[Int, Int]{
private var init = 0 //判断是否为空
override def isZero: Boolean = init == 0
//复制
override def copy(): AccumulatorV2[Int, Int] = {
val acc: MyAcc1 = new MyAcc1
acc.init = this.init
acc
} override def reset(): Unit = { //重置
init = 0
} override def add(v: Int): Unit = { //累加
init += v
} override def merge(other: AccumulatorV2[Int, Int]): Unit = { //合并
init += other.value
} override def value: Int = init //返回值
}
调用自定义累加器
object TestAcc {
def main(args: Array[String]): Unit = {
//初始化sc
val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf) val rdd: RDD[Range.Inclusive] = sc.makeRDD(1 to 4) //创建RDD
val acc: MyAcc1 = new MyAcc1 //创建自定义累加器对象
//注册累加器, 在Driver中sc
sc.register(acc, "MyAcc1") rdd.foreach(x => { //在行动算子中对累加器的值进行修改
acc.add(1)
println(x) //2 1 4 3
})
println("累加器:" + acc.value) //打印累加器的值 累加器:4
sc.stop() //关闭SparkContext }
}
广播变量(调优策略-不用它也可以实现功能,作为调优使用)
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。 在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35)
scala> broadcastVar.value
res33: Array[Int] = Array(1, 2, 3)
使用广播变量的过程如下:
(1) 通过对一个类型T的对象调用SparkContext.broadcast创建出一个Broadcast[T]对象。任何可序列化的类型都可以这么实现。
(2) 通过value属性访问该对象的值(在Java中为value()方法)。
(3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。
object TestBroadcast {
def main(args: Array[String]): Unit = {
//初始化sc
val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR") val rdd: RDD[String] = sc.makeRDD(List("kris", "alex", "smile"))
val temp = "ris" //
sc.broadcast(temp) //广播变量是Driver给每个Executor发一份,而不是每个Task(如果不是广播变量就会给每个task发送),每个task共享;减小数据传输量 ; /**
* 累加器和广播变量的区别:
* 都是共享变量
* 累加器只能写
* 广播变量只能读
*/
val result = rdd.filter(x => {
x.contains(temp)
})
result.foreach(println(_)) //kris }
}
SparkCore | Rdd| 广播变量和累加器的更多相关文章
-
Spark(三)RDD与广播变量、累加器
一.RDD的概述 1.1 什么是RDD RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可 ...
-
Spark RDD持久化、广播变量和累加器
Spark RDD持久化 RDD持久化工作原理 Spark非常重要的一个功能特性就是可以将RDD持久化在内存中.当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内 ...
-
Spark——DataFrames,RDD,DataSets、广播变量与累加器
Spark--DataFrames,RDD,DataSets 一.弹性数据集(RDD) 创建RDD 1.1RDD的宽依赖和窄依赖 二.DataFrames 三.DataSets 四.什么时候使用Dat ...
-
广播变量、累加器、collect
广播变量.累加器.collect spark集群由两类集群构成:一个驱动程序,多个执行程序. 1.广播变量 broadcast 广播变量为只读变量,它由运行sparkContext的驱动程序创建后发送 ...
-
Spark(八)【广播变量和累加器】
目录 一. 广播变量 使用 二. 累加器 使用 使用场景 自定义累加器 在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的 ...
-
Spark 广播变量和累加器
Spark 的一个核心功能是创建两种特殊类型的变量:广播变量和累加器 广播变量(groadcast varible)为只读变量,它有运行SparkContext的驱动程序创建后发送给参与计算的节点.对 ...
-
【Spark-core学习之七】 Spark广播变量、累加器
环境 虚拟机:VMware 10 Linux版本:CentOS-6.5-x86_64 客户端:Xshell4 FTP:Xftp4 jdk1.8 scala-2.10.4(依赖jdk1.8) spark ...
-
【Spark篇】---Spark中广播变量和累加器
一.前述 Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量. 累机器相当于统筹大变量,常用于计数,统计. 二.具体原理 ...
-
Spark共享变量(广播变量、累加器)
转载自:https://blog.csdn.net/Android_xue/article/details/79780463 Spark两种共享变量:广播变量(broadcast variable)与 ...
随机推荐
-
FindWindowEx用法
函数原型:HWND FindWindowEx(HWND hwndParent,HWND hwndChildAfter,LPCTSTR lpszClass,LPCTSTR lpszWindow): 参数 ...
-
Linux下JDK、Tomcat
1.JDK的安装 1. 下载JDK 先查看Linux系统是多少位(32位/64位):getconf LONG_BIT.再从JDK官网(http://www.oracle.com/technetw ...
-
Bitmap二次采样(处理图片过大的问题)
private Bitmap createImageThumbnail(String filePath, int newHeight, int newWidth) { BitmapFactory.Op ...
-
Android笔记:管理所有活动
以关闭所有活动为例 public class ActivityCollector { public static List<Activity> activities = new Array ...
-
养只爬虫当宠物(Node.js爬虫爬取58同城租房信息)
先上一个源代码吧. https://github.com/answershuto/Rental 欢迎指导交流. 效果图 搭建Node.js环境及启动服务 安装node以及npm,用express模块启 ...
-
C#扫描仪编程、条形码识别编程资料
扫描仪编程资料:http://www.cnblogs.com/wubh/archive/2011/11/07/2239178.html 图片条形码识别资料:http://www.codeproject ...
-
ubuntu 安装python,easy_install和pip
ubuntu12.04默认安装的python为 ms@ubuntums:~$ pythonPython 2.7.3 (default, Aug 1 2012, 05:16:07) 我需要用python ...
-
cowboy rest
REST Flowcharts 这章节将通过一些列不同的流程图来介绍REST处理状态机. 一个请求主要有四条路线,一个是方法OPTIONS. 一个是方法GET和HEAD.一个是PUT.POST和PAT ...
-
Visual Studio 2013 发布正式版
SELECT COUNT(1) 和SELECT COUNT(*)哪个性能好? Visual Studio 2013 发布正式版及使用感受 (注意:文末我添加了一个小节,用来更新最新的一些使用感受.) ...
-
DataPipeline CTO陈肃:从ETL到ELT,AI时代数据集成的问题与解决方案
引言:2018年7月25日,DataPipeline CTO陈肃在第一期公开课上作了题为<从ETL到ELT,AI时代数据集成的问题与解决方案>的分享,本文根据陈肃分享内容整理而成. 大家好 ...