一.图解
二.原理
将executor端的数据计算之后,最后返回到driver端。
一般是累加数字,也可以累加String类型,但是需要自定义。
注意:累加器只能在driver端定义,初始化,在executor端更新使用,在executor端获取值。
在executor中不能accumulator.value()获取值,而要直接用accumulator获取。
在driver端获取值的时候,不能直接用accumulator,而是要用用accumulator.value获取的,这一点和executor端有区别。
三.代码 实现
package com.bjsxt
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Scala {
def main(args: Array[String]): Unit = {
/**
* 累加求和
*/
val conf=new SparkConf().setAppName("test").setMaster("local")
val sc=new SparkContext(conf)
val rdd1=sc.textFile("./words")
/**
* 初始化
* i=0
* 常量的初始化用var
*/
var i=0
rdd1.map(line=>{
/**
* 这个i是在executor端去执行的
* map 是一对一的
* 是一遍一遍地去处理的
*/
i+=1
accumulator.add(1)
println("Executor execuotr ="+accumulator)
//在executor端获取值的时候,不能用accumulator.value,而是直接用accumulator获取的,这一点和driver端有区别。
line
}).collect()//此处的collect是为了触发执行,没有别的意思
println("Driver executor ="+accumulator.value)
//在driver端获取值的时候,不能直接用accumulator,而是要用用accumulator.value获取的,这一点和executor端有区别。
//这个i是在driver端,不会进入Executor端去执行
/**
* 或者可以自定义累加求和(如下所示)
*/
val accumulator=sc.accumulator(0)
rdd1.map(lin=>{
accumulator.add(1)
}).collect()
println("Driver accumulator ="+accumulator.value)
}
}