Spark菜鸟学习营Day1 从Java到RDD编程

时间:2022-06-15 05:57:18

Spark菜鸟学习营Day1

从Java到RDD编程

菜鸟训练营主要的目标是帮助大家从零开始,初步掌握Spark程序的开发。

Spark的编程模型是一步一步发展过来的,今天主要带大家走一下这段路,让我们从一段最最基础的Java代码开始。

问题:Java有哪些数据结构

大致有如下几种,其中List与Map是最重要的:

  • List
  • Map
  • Set
  • Array
  • Heap
  • Stack
  • Queue
  • Tree

练习:构造一个1-5的List,把他们打印出来

写法1

        List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
for (int i = 0; i < input.size(); i++) {
System.out.println(input.get(i));
}

这个写法存在的问题:

  1. 存在额外定义的变量i,需要对i进行逻辑控制(比如不能超过size)。
  2. 无法在循环内部如果对List进行安全更新,会产生逻辑问题。
  3. 这个写法非常的不推荐使用!!!

写法2

        Iterator<Integer> it = input.iterator();
while (it.hasNext()) {
System.out.println("a" + it.next());
}

几点说明:

  1. iterator相当于oracle中游标,拥有数据只读和只能一次性读的特点。
  2. hasNext()和next)()是游标必备的方法,其中hasNext是判断是否有下一条,等同%Found,next是获取下一条数,等同fetch。

    这个写法存在的问题:
  3. 存在额外的对象it,读取方法比较简单,但是也需要记忆。

写法3

        for (Integer i : input) {
System.out.println(i);
}

这是对写法2的优化,逻辑完全等价,是一个语法糖。

写法2和写法3共同存在的问题:

  1. 执行是串行的,对大数量的情况不适合。

写法4

input.parallelStream().forEach(
new Consumer<Integer>() {
@Override
public void accept(Integer a) {
System.out.println(a);
}
}
);

在这个写法中,执行是并行执行的,

从语句特点看是将一个对象出入了forEach方法中,进行一下说明:

  1. 多线程程序,其实是几个互相独立运行的程序,产生了一个问题,我们需要将逻辑传递给给各自独立运行的程序
  2. 由于在Java中对象是全局存储的,所以是唯一可以在独立运行程序中传递的介质。
  3. 但是我们要的传递的是逻辑,比如要转变为对象才能传递,如果变成对象,要做两件事情:
    • 把逻辑,用方法来封装(示例中的apply方法)
    • 把方法,用class或者intereface来封装,更推荐intereface(示例中的Consumer)
      • 因为class既可以包含数据、又可以包含方法,而intereface中只包含方法,在我们的场景中,只需要传递方法
  4. 最后,接口提供方预先将intereface和method定义好,调用者按照模板使用即可。

    存在的问题:
  5. 语句比较长,逻辑也比较复杂

写法5

        input.parallelStream().forEach(
a -> System.out.println(a)
);

这个写法和写法4逻辑没有任何区别,主要是用lambda表达式简化了对象的写法,是一个语法糖。

写法4和写法5共同的问题是:

当数据量进一步扩大,一台机器就算是多线程运行也无法完成的情况下,是无法处理的。

小结

在上述的几个写法中,主要是通过对于List的一系列增强,从而解决了一系列的问题。

  1. 写法2、3相比写法1,使用Iterator增强List,实现了数据的可更新。
  2. 写法4、5相比前面的写法,使用Stream增强Iterator,实现了数据的并发运算。

    但是,还是留下了在更大数据集情况下的处理。这个是时候,我们需要引入Spark。

    Spark中核心是RDD,是对Stream的进一步增强,在并发的基础上,增加了同时在多台机器上的分布式计算。解决了大数据集的问题。

练习:RDD编程

题目

进行RDD操作的训练:

  1. 读取交易记录
  2. 按照SecurityId进行计数
  3. 根据计数,从高到低排序
  4. 输出结果

练习过程:

Step1:获取RDD数据,这个是调用公共方法,请注意的是,PracticePojo是预先准备好的测试数据名称。

JavaRDD<PracticePojo> inputTradeRecords = this.getInputRDD(PracticePojo.class);

Step2:将数据转为Key-Value格式

因为在Spark处理中,reduce、sort、group等操作涉及到在分布式机器间的数据交互,数据必须要有Key来作为分布操作的依据,所以我们首先要将数据格式进行转换。

        JavaPairRDD<String, Integer> mappedTradeRecords = inputTradeRecords.mapToPair(
new PairFunction<PracticePojo, String, Integer>() {
@Override
public Tuple2<String, Integer> call(PracticePojo practicePojo) throws Exception {
return new Tuple2<String, Integer>(practicePojo.getSecurityId(), 1);
}
});

Step3:执行计数操作。

这里会用到reduceByKey方法,这里要注意的是,reduce是一个非常常用的操作,它有两个操作步骤:

  1. 对数据按照Key进行分组
  2. 对每组数据执行算子计算,需要注意reduce的计算必须满足交换律和结合律
    • 这个算子是指,当整个计算过程都满足交换律和结合律后,我们可以用任意两个数据之间的计算关系来定义整个计算关系
    • 比如:sum和count,可以用加法表示,max可以用a>b?a:b这样的计算来表示,min是反过来
        JavaPairRDD<String, Integer> reducedTradeRecords = mappedTradeRecords.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});

Step4:执行排序

这里有两步操作:

  1. 首先是使用mapToPair方法进行数据变形,因为sortByKey方法仅是针对key来排序,而我们原始数据的key并不是我们要的排序字段,所以首先需要将key和value换一下
  2. 用sortByKey执行排序操作,需要注意的是,参数是一个布尔值,默认为升序,false表示降序
     JavaPairRDD<Integer, String> reversedTradeRecords = reducedTradeRecords.mapToPair(
new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return new Tuple2<Integer, String>(stringIntegerTuple2._2, stringIntegerTuple2._1);
}
}); JavaPairRDD<Integer, String> sortedTradeRecords = reversedTradeRecords.sortByKey(false);

Step5:输出结果

一般我们都是输出一个JavaRDD,这里采用了预先定义的PracticeResultPojo结构,会将排序完的结果映射到这个结构上。

JavaRDD<PracticeResultPojo> resultTradeRecords = sortedTradeRecords.map(
new Function<Tuple2<Integer, String>, PracticeResultPojo>() {
@Override
public PracticeResultPojo call(Tuple2<Integer, String> v1) throws Exception {
PracticeResultPojo resultPojo = new PracticeResultPojo();
resultPojo.setSecurityId(v1._2);
resultPojo.setCount(v1._1);
return resultPojo;
}
});

小结

RDD编程,难点如下:

  1. 采用对象方式来封装逻辑,和stream中处理方式一致,但是由于算子较多,每个算子要求不同,需要有一定练习来熟悉
  2. 输入输出为JavaRDD,但reduce、group、sort等操作需要中间使用mapToPair将JavaRDD转成JavaPairRDD才能操作,会涉及到多次转换
  3. reduce操作中,算子是更高层次的抽象,有一定的理解难度

Spark菜鸟学习营Day1 从Java到RDD编程的更多相关文章

  1. Spark菜鸟学习营Day5 分布式程序开发

    Spark菜鸟学习营Day5 分布式程序开发 这一章会和我们前面进行的需求分析进行呼应,完成程序的开发. 开发步骤 分布式系统开发是一个复杂的过程,对于复杂过程,我们需要分解为简单步骤的组合. 针对每 ...

  2. Spark菜鸟学习营Day4 单元测试程序的编写

    Spark菜鸟学习营Day4 单元测试程序的编写 Spark相比于传统代码是比较难以调试的,单元测试的编写是非常必要的. Step0:需求分析 在测试案例编写前,需完成需求分析工作,明确程序所有的输入 ...

  3. Spark菜鸟学习营Day6 分布式代码运行调试

    Spark菜鸟学习营Day6 分布式代码运行调试 作为代码调试,一般会分成两个部分 语法调试,也就是确定能够运行 结果调试,也就是确定程序逻辑的正确 其实这个都离不开运行,所以我们说一下如何让开发的S ...

  4. Spark菜鸟学习营Day3 RDD编程进阶

    Spark菜鸟学习营Day3 RDD编程进阶 RDD代码简化 对于昨天练习的代码,我们可以从几个方面来简化: 使用fluent风格写法,可以减少对于中间变量的定义. 使用lambda表示式来替换对象写 ...

  5. Spark菜鸟学习营Day2 分布式系统需求分析

    Spark菜鸟学习营Day2 分布式系统需求分析 本分析主要针对从原有代码向Spark的迁移.要注意的是Spark和传统开发有着截然不同的思考思路,所以我们需要首先对原有代码进行需求分析,形成改造思路 ...

  6. spark学习(六)Java版RDD基本的基本操作

    1.map算子 private static void map() { //创建SparkConf SparkConf conf = new SparkConf() .setAppName(&quot ...

  7. 《Spark快速大数据分析》—— 第三章 RDD编程

  8. 【Spark深入学习 -10】基于spark构建企业级流处理系统

    ----本节内容------- 1.流式处理系统背景 1.1 技术背景 1.2 Spark技术很火 2.流式处理技术介绍 2.1流式处理技术概念 2.2流式处理应用场景 2.3流式处理系统分类 3.流 ...

  9. 菜鸟学习Spring——60s配置XML方法实现简单AOP

    一.概述. 上一篇博客讲述了用注解的形式实现AOP现在讲述另外一种AOP实现的方式利用XML来实现AOP. 二.代码演示. 准备工作参照上一篇博客<菜鸟学习Spring--60s使用annota ...

随机推荐

  1. Sql Server系列:数据库操作

    1 创建数据库 1.1 CREATE DATABASE语法 CREATE DATABASE database_name [ ON [ PRIMARY ] <filespec> [ ,... ...

  2. Linux Shell 高级编程技巧1----深入讨论(awk、&lt&semi;&lt&semi;)

    1.深入讨论(awk.<<)    1.1.深入讨论awk        记录和域,模式和动作,正则表达式和元字符            基础教程中已经介绍        条件和逻辑操作符 ...

  3. ogrinfo使用

    简介 orginfo是OGR模块中提供的一个重要工具,用于读取地图文件中记录,可以指定筛选条件(按字段.sql.矩形范围) 使用方式 命令行参数 Usage: ogrinfo [--help-gene ...

  4. JavaScript Patterns 4&period;1 Functions Background

    Functions are first-class objects and they provide scope. • Can be created dynamically at runtime, d ...

  5. 统计难题 HDOJ --1251

    统计难题 Time Limit: 4000/2000 MS (Java/Others)    Memory Limit: 131070/65535 K (Java/Others)Total Submi ...

  6. 常用排序算法之——快速排序(C语言&plus;VC6&period;0平台)

    经典排序算法中快速排序具有较好的效率,但其实现思路相对较难理解. #include<stdio.h> int partition(int num[],int low,int high) / ...

  7. nlp底层技术列举

    其实目前除了之前博客写到的一些关于自然语言处理用到的知识点之外,很多其他nlp技术只是会用但是不了解原理,先整体分个类,之后再仔细分析吧. 上图是https://www.sohu.com/a/1386 ...

  8. CentOS 7 基础命令安装

    https://my.oschina.net/u/1428349/blog/288708 1. ifconfig安装 > yum install net-tools 临时变量(可以直接使用sbi ...

  9. 直接修改class文件内容即使是文本会导致App异常,正确方式是修改java再用生成的class替换掉原有的class

    前几天来了个小任务,把某项目中某人的邮件地址改了下. 由于对项目不熟悉,于是采用find方式找出app中所有包含某人邮件地址的文件都找出来了. xml,properties大约三四个,还有两个clas ...

  10. PLSQL常用配置之窗口&sol;版面保存、SQL格式化&sol;美化、SQL注释&bsol;去掉注释等快捷键配置、登陆历史修改配置

    http://blog.csdn.net/hyeidolon/article/details/8251791   PLSQL常用配置之窗口/版面保存.SQL格式化/美化.SQL注释\去掉注释等快捷键配 ...