Spark小课堂Week3 FirstSparkApp(RDD开发)

时间:2023-01-19 07:39:35

Spark小课堂Week3 FirstSparkApp

问题:Java有哪些数据结构

大致有如下几种,其中List与Map是最重要的:

  • List
  • Map
  • Set
  • Array
  • Heap
  • Stack
  • Queue
  • Tree

练习:构造一个1-5的List,把他们打印出来

写法1

        List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
for (int i = 0; i < input.size(); i++) {
System.out.println(input.get(i));
}

这个写法存在的问题:

  1. 存在额外定义的变量i,需要对i进行逻辑控制(比如不能超过size)。
  2. 无法在循环内部如果对List进行安全更新,会产生逻辑问题。
  3. 这个写法非常的不推荐使用!!!

写法2

        Iterator<Integer> it = input.iterator();
while (it.hasNext()) {
System.out.println("a" + it.next());
}

几点说明:

  1. iterator相当于oracle中游标,拥有数据只读和只能一次性读的特点。
  2. hasNext()和next)()是游标必备的方法,其中hasNext是判断是否有下一条,等同%Found,next是获取下一条数,等同fetch。

    这个写法存在的问题:
  3. 存在额外的对象it,读取方法比较简单,但是也需要记忆。

写法3

        for (Integer i : input) {
System.out.println(i);
}

这是对写法2的优化,逻辑完全等价,是一个语法糖。

写法2和写法3共同存在的问题:

  1. 执行是串行的,对大数量的情况不适合。

写法4

input.parallelStream().forEach(
new Consumer<Integer>() {
@Override
public void accept(Integer a) {
System.out.println(a);
}
}
);

在这个写法中,执行是并行执行的,

从语句特点看是将一个对象出入了forEach方法中,进行一下说明:

  1. 多线程程序,其实是几个互相独立运行的程序,产生了一个问题,我们需要将逻辑传递给给各自独立运行的程序
  2. 由于在Java中对象是全局存储的,所以是唯一可以在独立运行程序中传递的介质。
  3. 但是我们要的传递的是逻辑,比如要转变为对象才能传递,如果变成对象,要做两件事情:
    • 把逻辑,用方法来封装(示例中的apply方法)
    • 把方法,用class或者intereface来封装,更推荐intereface(示例中的Consumer)
      • 因为class既可以包含数据、又可以包含方法,而intereface中只包含方法,在我们的场景中,只需要传递方法
  4. 最后,接口提供方预先将intereface和method定义好,调用者按照模板使用即可。

    存在的问题:
  5. 语句比较长,逻辑也比较复杂

写法5

        input.parallelStream().forEach(
a -> System.out.println(a)
);

这个写法和写法4逻辑没有任何区别,主要是用lambda表达式简化了对象的写法,是一个语法糖。

写法4和写法5共同的问题是:

当数据量进一步扩大,一台机器就算是多线程运行也无法完成的情况下,是无法处理的。

小结

在上述的几个写法中,主要是通过对于List的一系列增强,从而解决了一系列的问题。

  1. 写法2、3相比写法1,使用Iterator增强List,实现了数据的可更新。
  2. 写法4、5相比前面的写法,使用Stream增强Iterator,实现了数据的并发运算。

    但是,还是留下了在更大数据集情况下的处理。这个是时候,我们需要引入Spark。

    Spark中核心是RDD,是对Stream的进一步增强,在并发的基础上,增加了同时在多台机器上的分布式计算。解决了大数据集的问题。

练习:RDD编程

题目

进行RDD操作的训练:

  1. 读取交易记录
  2. 按照SecurityId进行计数
  3. 根据计数,从高到低排序
  4. 输出结果

练习过程:

Step1:获取RDD数据,这个是调用公共方法,请注意的是,PracticePojo是预先准备好的测试数据名称。

JavaRDD<PracticePojo> inputTradeRecords = this.getInputRDD(PracticePojo.class);

Step2:将数据转为Key-Value格式

因为在Spark处理中,reduce、sort、group等操作涉及到在分布式机器间的数据交互,数据必须要有Key来作为分布操作的依据,所以我们首先要将数据格式进行转换。

        JavaPairRDD<String, Integer> mappedTradeRecords = inputTradeRecords.mapToPair(
new PairFunction<PracticePojo, String, Integer>() {
@Override
public Tuple2<String, Integer> call(PracticePojo practicePojo) throws Exception {
return new Tuple2<String, Integer>(practicePojo.getSecurityId(), 1);
}
});

Step3:执行计数操作。

这里会用到reduceByKey方法,这里要注意的是,reduce是一个非常常用的操作,它有两个操作步骤:

  1. 对数据按照Key进行分组
  2. 对每组数据执行算子计算,需要注意reduce的计算必须满足交换律和结合律
    • 这个算子是指,当整个计算过程都满足交换律和结合律后,我们可以用任意两个数据之间的计算关系来定义整个计算关系
    • 比如:sum和count,可以用加法表示,max可以用a>b?a:b这样的计算来表示,min是反过来
        JavaPairRDD<String, Integer> reducedTradeRecords = mappedTradeRecords.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});

Step4:执行排序

这里有两步操作:

  1. 首先是使用mapToPair方法进行数据变形,因为sortByKey方法仅是针对key来排序,而我们原始数据的key并不是我们要的排序字段,所以首先需要将key和value换一下
  2. 用sortByKey执行排序操作,需要注意的是,参数是一个布尔值,默认为升序,false表示降序
     JavaPairRDD<Integer, String> reversedTradeRecords = reducedTradeRecords.mapToPair(
new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return new Tuple2<Integer, String>(stringIntegerTuple2._2, stringIntegerTuple2._1);
}
}); JavaPairRDD<Integer, String> sortedTradeRecords = reversedTradeRecords.sortByKey(false);

Step5:输出结果

一般我们都是输出一个JavaRDD,这里采用了预先定义的PracticeResultPojo结构,会将排序完的结果映射到这个结构上。

JavaRDD<PracticeResultPojo> resultTradeRecords = sortedTradeRecords.map(
new Function<Tuple2<Integer, String>, PracticeResultPojo>() {
@Override
public PracticeResultPojo call(Tuple2<Integer, String> v1) throws Exception {
PracticeResultPojo resultPojo = new PracticeResultPojo();
resultPojo.setSecurityId(v1._2);
resultPojo.setCount(v1._1);
return resultPojo;
}
});

小结

RDD编程,难点如下:

  1. 采用对象方式来封装逻辑,和stream中处理方式一致,但是由于算子较多,每个算子要求不同,需要有一定练习来熟悉
  2. 输入输出为JavaRDD,但reduce、group、sort等操作需要中间使用mapToPair将JavaRDD转成JavaPairRDD才能操作,会涉及到多次转换
  3. reduce操作中,算子是更高层次的抽象,有一定的理解难度

关于

小课堂是在公司进行内部交流的一系列主题,偏基础,但是比较零散,持续更新中。

Spark小课堂Week3 FirstSparkApp(RDD开发)的更多相关文章

  1. Spark小课堂Week3 FirstSparkApp&lpar;Dataframe开发&rpar;

    Spark小课堂Week3 FirstSparkApp(代码优化) RDD代码简化 对于昨天练习的代码,我们可以从几个方面来简化: 使用fluent风格写法,可以减少对于中间变量的定义. 使用lamb ...

  2. Spark小课堂Week5 Scala初探

    Spark小课堂Week5 Scala初探 Scala是java威力加强版. 对Java的改进 这里会结合StreamingContext.scala这个代码说明下对Java的改进方面. 方便测试方式 ...

  3. Spark小课堂Week6 启动日志详解

    Spark小课堂Week6 启动日志详解 作为分布式系统,Spark程序是非常难以使用传统方法来进行调试的,所以我们主要的武器是日志,今天会对启动日志进行一下详解. 日志详解 今天主要遍历下Strea ...

  4. Spark小课堂Week2 Hello Streaming

    Spark小课堂Week2 Hello Streaming 我们是怎么进行数据处理的? 批量方式处理 目前最常采用的是批量方式处理,指非工作时间运行,定时或者事件触发.这种方式的好处是逻辑简单,不影响 ...

  5. Spark小课堂Week1 Hello Spark

    Spark小课堂Week1 Hello Spark 看到Spark这个词,你的第一印象是什么? 这是一朵"火花",官方的定义是Spark是一个高速的.通用的.分布式计算系统!!! ...

  6. Spark小课堂Week7 从Spark中一个例子看面向对象设计

    Spark小课堂Week7 从Spark中一个例子看面向对象设计 今天我们讨论了个问题,来设计一个Spark中的常用功能. 功能描述:数据源是一切处理的源头,这次要实现下加载数据源的方法load() ...

  7. Spark小课堂Week4 从控制台看Spark逻辑结构

    Spark小课堂Week4 从控制台看Spark逻辑结构 层级关系: 从监控控制台,我们可以看到如下关系: 一个 Job 包含 n Stage 一个 Stage 包含 n Task Job0解决什么问 ...

  8. Spark踩坑记——从RDD看集群调度

    [TOC] 前言 在Spark的使用中,性能的调优配置过程中,查阅了很多资料,之前自己总结过两篇小博文Spark踩坑记--初试和Spark踩坑记--数据库(Hbase+Mysql),第一篇概况的归纳了 ...

  9. Hive数据分析——Spark是一种基于rdd(弹性数据集)的内存分布式并行处理框架,比于Hadoop将大量的中间结果写入HDFS,Spark避免了中间结果的持久化

    转自:http://blog.csdn.net/wh_springer/article/details/51842496 近十年来,随着Hadoop生态系统的不断完善,Hadoop早已成为大数据事实上 ...

随机推荐

  1. IE8和W3C标准下IFRAME刷新和URL的区别

    一个页面中包含了一个iframe,我们要刷新这个iframe的情况 url在IE8和W3C以及IE11的区别如下: URL使用相对路径,绝对路径比如http://localhost:5568/替换成I ...

  2. AngularJS&colon; &&num;39&semi;Template for directive must have exactly one root element&&num;39&semi; when using &&num;39&semi;th&&num;39&semi; tag in directive template

    .controller('HomeController', function($scope,$location) { $scope.userName='天下大势,为我所控!'; $scope.clkU ...

  3. Java基础之线程——管理线程同步代码块(BankOperation4)

    控制台程序. 除了同步类对象的方法之外,还可以把程序中的语句或代码块制定为synchronized,这种方式更强大,因为可以指定哪个对象从语句或代码块的同步中获益,而不像同步方法那样仅仅是包含代码的对 ...

  4. 如何配置svn服务器&lpar;通过VisualServer服务器&rpar;

    如果你已经安装好了VisualServer服务器,现在让我们一起来配置svn服务器吧

  5. &lbrack;技巧&rsqb;使用Xcode集成的HeaderDoc自动生成注释和开发文档

    [技巧]使用Xcode集成的HeaderDoc自动生成注释和开发文档     Doxygen本来是一个很好的工具,可是我感觉在mac系统下,如果用doxygen最后生成的CHM文件感觉就不是那么恰当, ...

  6. HDU 4612 Warm up(双连通分量缩点&plus;求树的直径)

    思路:强连通分量缩点,建立一颗新的树,然后求树的最长直径,然后加上一条边能够去掉的桥数,就是直径的长度. 树的直径长度的求法:两次bfs可以求,第一次随便找一个点u,然后进行bfs搜到的最后一个点v, ...

  7. SSM-SpringMVC-16:SpringMVC中小论注解式开发之访问方式篇

     ------------吾亦无他,唯手熟尔,谦卑若愚,好学若饥------------- 访问方式可以指定,打个比方,你通过get方式进入登陆页面,通过post发送ajax数据库校验或者post提交 ...

  8. 《前端之路》之 JavaScript 进阶技巧之高阶函数(下)

    目录 第二章 - 03: 前端 进阶技巧之高阶函数 一.防篡改对象 1-1:Configurable 和 Writable 1-2:Enumerable 1-3:get .set 2-1:不可扩展对象 ...

  9. Java网络编程案例---聊天室

    网络编程是指编写运行在多个设备(计算机)的程序,这些设备都通过网络连接起来. java.net包中JavaSE的API包含有类和接口,它们提供低层次的通信细节.你可以直接使用这些类和接口,来专注于解决 ...

  10. Mininet 系列实验(一)

    关于SDN的第一个实验,似乎实验室里的前辈们也都是从这里开始的. 实验内容 使用源码安装Mininet 参考 Mininet使用源码安装 实验环境 虚拟机:Oracle VM VirtualBox U ...