spark累计器
因为task的执行是在多个Executor中执行,所以会出现计算总量的时候,每个Executor只会计算部分数据,不能全局计算。
累计器是可以实现在全局中进行累加计数。
注意:
累加器只能在driver端定义,driver端读取,不能在Executor端读取。
广播变量只能在driver端定义,在Executor端读取,Executor不能修改。
下面是实践的代码:
package SparkStreaming; import org.apache.commons.collections.iterators.ArrayListIterator;
import org.apache.commons.io.LineIterator;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2; import java.util.Iterator;
import java.util.List; public class totalization_device {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("totalization_device");
JavaSparkContext sc = new JavaSparkContext(conf);
/*
* 定义一个累加器
* */
Accumulator<Integer> accumulator = sc.accumulator();
JavaRDD<String> fileRDD = sc.textFile("E:/2018_cnic/learn/wordcount.txt");
JavaRDD<String> fileRDD1 = fileRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
accumulator.add();
return new ArrayListIterator(s.split(" "));
}
});
JavaPairRDD<String, Integer> pairRDD = fileRDD1.mapToPair(new PairFunction<String, String, Integer>() { @Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s,);
}
});
JavaPairRDD<String, Integer> reducebykeyRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
List<Tuple2<String, Integer>> collect = reducebykeyRDD.collect();
for(Tuple2 tup:collect){
System.out.println(tup);
}
Integer num = accumulator.value();
System.out.println("一共有:"+num+"行");
sc.close();
}
}
结果输出:
// :: INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID ) in ms on localhost (executor driver) (/)
// :: INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID ) in ms on localhost (executor driver) (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
// :: INFO DAGScheduler: ResultStage (collect at totalization_device.java:) finished in 0.051 s
// :: INFO DAGScheduler: Job finished: collect at totalization_device.java:, took 0.273877 s
(,)
(authentication,)
(Registered,)
(is,)
(Found,)
(master.Master:,)
(spark.SecurityManager:,)
(util.log:,)
(,)
(modify,)
(classes,)
(,)
([jar:file:/opt/workspace/hive-3.1./lib/log4j-slf4j-impl-2.10..jar!/org/slf4j/impl/StaticLoggerBinder.class],)
(.,)
(type,)
(with,)
(INFO,)
(permissions:,)
(groups,)
(using,)
(//,)
(Class,)
(@1326ms,)
(WARN,)
(root,)
(signal,)
('MasterUI',)
(,)
(,)
(Set(root);,)
(version,)
(,)
(ui,)
(,)
(load,)
(Set();,)
(,)
(,)
(::,)
(Actual,)
(initialized,)
(server.Server:,)
(master,)
(,)
(multiple,)
(56130C,)
(handler,)
(,)
(,)
(TERM,)
(,)
(daemon,)
(bindings.,)
(builtin-java,)
(server.AbstractConnector:,)
(users,)
([jar:file:/opt/workspace/hbase-1.4./lib/slf4j-log4j12-1.7..jar!/org/slf4j/impl/StaticLoggerBinder.class],)
(http://www.slf4j.org/codes.html#multiple_bindings,1)
(105L,,)
(Starting,)
(jetty-9.3.z-SNAPSHOT,)
(Spark,)
(,)
(SLF4J,)
(platform...,)
(,)
(util.NativeCodeLoader:,)
(Successfully,)
(on,)
('sparkMaster',)
(library,)
(service,)
(,)
(at,)
(in,)
(,)
(@master1,)
(See,)
(.,)
(Logging,)
(missions:,)
(util.Utils:,)
(spark://master1:7077,1)
(for,)
(Changing,)
(,)
(native-hadoop,)
(port,)
(Running,)
(explanation.,)
(your,)
(view,)
(acls,)
(,)
(Unable,)
(binding,)
(to:,)
(disabled;,)
(contains,)
(util.SignalUtils:,)
(process,)
(,)
(SLF4J:,)
(ServerConnector@1cbf22af{HTTP/1.1,[http/1.1]}{0.0.0.0:},)
(,)
(,)
(,)
(SecurityManager:,)
(Started,)
(INT,)
(Set(),)
("spark-root-org.apache.spark.deploy.master.Master-1-master1.out",)
(to,)
(applicable,)
(HUP,)
(started,)
(of,)
(path,)
(where,)
(,)
(an,)
([jar:file:/opt/workspace/hadoop-2.9./share/hadoop/common/lib/slf4j-log4j12-1.7..jar!/org/slf4j/impl/StaticLoggerBinder.class],)
([org.slf4j.impl.Log4jLoggerFactory],)
(2.3.,)
(::,)
(@1280ms,)
(name:,)
(per,)
一共有:25行
// :: INFO SparkUI: Stopped Spark web UI at http://hadoop:4040
// :: INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
// :: INFO MemoryStore: MemoryStore cleared
// :: INFO BlockManager: BlockManager stopped
// :: INFO BlockManagerMaster: BlockManagerMaster stopped
// :: INFO
Spark累加器的更多相关文章
-
Spark 累加器
由于spark是分布式的计算,所以使得每个task间不存在共享的变量,而为了实现共享变量spark实现了两种类型 - 累加器与广播变量, 对于其概念与理解可以参考:共享变量(广播变量和累加器).可能需 ...
-
spark累加器、广播变量
一言以蔽之: 累加器就是只写变量 通常就是做事件统计用的 因为rdd是在不同的excutor去执行的 你在不同excutor中累加的结果 没办法汇总到一起 这个时候就需要累加器来帮忙完成 广播变量是只 ...
-
Spark累加器(Accumulator)陷阱及解决办法
累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变.累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数 ...
-
Spark累加器(Accumulator)
一.累加器简介 在Spark中如果想在Task计算的时候统计某些事件的数量,使用filter/reduce也可以,但是使用累加器是一种更方便的方式,累加器一个比较经典的应用场景是用来在Spark St ...
-
入门大数据---Spark累加器与广播变量
一.简介 在 Spark 中,提供了两种类型的共享变量:累加器 (accumulator) 与广播变量 (broadcast variable): 累加器:用来对信息进行聚合,主要用于累计计数等场景: ...
-
Spark(八)【广播变量和累加器】
目录 一. 广播变量 使用 二. 累加器 使用 使用场景 自定义累加器 在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的 ...
-
Spark处理日志文件常见操作
spark有自己的集群计算技术,扩展了hadoop mr模型用于高效计算,包括交互式查询和 流计算.主要的特性就是内存的集群计算提升计算速度.在实际运用过程中也当然少不了对一些数据集的操作.下面将通过 ...
-
spark面试总结3
Spark core面试篇03 1.Spark使用parquet文件存储格式能带来哪些好处? 1) 如果说HDFS 是大数据时代分布式文件系统首选标准,那么parquet则是整个大数据时代文件存储格式 ...
-
Spark面试相关
Spark Core面试篇01 随着Spark技术在企业中应用越来越广泛,Spark成为大数据开发必须掌握的技能.前期分享了很多关于Spark的学习视频和文章,为了进一步巩固和掌握Spark,在原有s ...
随机推荐
-
css让元素居中显示
通常在absolute之后, 想让元素居中,都会采用margin-top:-[元素高度的一半]和 margin-left:-[元素宽度的一半] , 但是当我们的元素宽高不是固定的时候, 这就难办了, ...
-
ecstore与淘宝sdk的autoload加载顺序问题
ecstore使用spl_autoload_register实现类的自动加载,这个很大的方便我们不用每次都要手动的去include一些类和函数.不过这样会导致一些问题,比如说,有一些extension ...
-
centos添加开机启动项目
centOS 配置开机自启动两种方式: 1.vi /etc/rc.d/rc.local 在此文件中加入启动的脚本 2.chkconfig 增加自己的脚本 --add --list --del 步骤: ...
-
hadoop群集安装中碰到的问题
在hadoop群集安装结束后,进行格式测试出现问题如下 格式化 cd /data/hadoop/bin ./hdfs namenode -format 15/01/21 05:21:17 WARN f ...
-
Android XML文件布局各个属性详解
第一常用类:属性值为true或false android:layout_centerHrizontal 水平居中 android:layout_centerVertical 垂直居中 android: ...
-
清楚form表单数据的便捷jQuery之法
有时候可能需要实现这样的效果:注册表单或者地址表单等填写多个记录之后,想要清除重新填写,如果一个个删除非常麻烦,因此这时清除按钮非常必须.接下来为您详细介绍两个自己经历的便捷方法,需要了解的朋友参考下 ...
-
异常处理:你不可能总是对的2 - 零基础入门学习Python033
异常处理:你不可能总是对的2 让编程改变世界 Change the world by program 我们已经了解足够多的可能碰到的异常,那我们这节课就来谈谈如何检测这些异常并处理它们. 异常检测我们 ...
-
SQL检索记录
<<第一章检索记录>>:关于表使用SELECT语句和特殊字符"*": *:SELECT * from emp; 1:分别列出每一行:SELECT empno ...
-
background相关属性
background-origin: 规定 background-position 属性相对于容器的哪一部分来定位. padding-box 背景图像相对于内边距框来定位:(默认) border-bo ...
-
centos6.5配置uwsgi与nginx支持django
一.centos中升级python 1. > wget https://www.python.org/ftp/python/3.5.4/Python-3.5.4.tgz # https://ww ...