spark学习笔记01

时间:2023-12-11 10:01:38

Spark入门

1、课程目标

  • 1、熟悉spark相关概念

  • 2、搭建一个spark集群

  • 3、编写简单spark应用程序

2、spark概述

  • spark是什么

    • 是基于内存的分布式计算引擎,计算速度非常快,仅仅只是涉及到数据的计算,没有涉及到数据存储。可以对接外部的数据源(比如hdfs,这个时候就需要搭建一个hadoop集群)

  • 为什么要学习spark

    • spark运行速度快,由于中间数据结果可以不落地,直接保存在内存中,速度比mapreduce快很多

3、spark特性

  • 速度快

    • spark比mapreduce在内存快100x,在磁盘中快10x

      • spark任务中间结果数据不可以不落地,直接保存在内存中

      • 在mapreduce任务中,如果当前有100 task,对应的会产生多少个进程去运行? 100个,mapreduce是以进程的方式去运行任务

      • 在spark任务中,如果当前有100个task,对应它只需要开启100个线程去运行就可以了,spark是以线程的方式运行任务。

  • 易用性

    • 可以快速编写spark应用程序,使用4种语言开发 java/scala/python/R

  • 通用性

    • 可以使用sparksql/sparkStreaming/Mlib/Graphx

  • 兼容性

    • spark可以运行在不同平台里面

      • yarn 整个资源调度由resourcemanager

      • mesos apache开源的一个资源调度框架

      • standalone 整个资源调度由 master

4、spark集群安装

  • 1、下载spark安装包

    • spark-2.0.2-bin-hadoop2.7.tgz

  • 2、先规划安装目录

    • /export/servers

  • 3、解压安装包

    • tar -zxvf spark-2.0.2-bin-hadoop2.7.tgz -C /export/servers

  • 4、重命名安装目录

    • mv spark-2.0.2-bin-hadoop2.7 spark

  • 5、修改配置文件

    • 进入到spark安装目录下conf文件夹

      • 修改spark-env.sh.template(先重命名)

        • mv spark-env.sh.template spark-env.sh

        • 配置java环境变量

          • export JAVA_HOME=/export/servers/jdk

        • 配置spark集群的master地址

          • export SPARK_MASTER_HOST=node1

        • 配置spark集群的master端口

          • export SPARK_MASTER_PORT=7077

      • 修改slaves.teamplate(先重命名)

        • mv slaves.template slaves

        • 添加spark集群中的worker节点

          • node2

          • node3

  • 6、配置spark环境变量

    • 修改 /etc/profile

      • export SPARK_HOME=/export/servers/spark

      • export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

  • 7、分发安装目录到其他节点

    • scp -r /export/servers/spark root@node2:/export/servers

    • scp -r /export/servers/spark root@node3:/export/servers

    • scp /etc/profile root@node2:/etc

    • scp /etc/profile root@node3:/etc

  • 8、让所有节点的环境变量生效

    • 在所有节点上执行命令

      • source /etc/profile

5、spark集群启动和停止

  • 启动spark集群

    • $SPARK_HOME/sbin/start-all.sh

  • 停止spark集群

    • $SPARK_HOME/sbin/stop-all.sh

6、spark的master web管理界面

7、基于zk的sparkHA高可用集群

  • 1、需要搭建一个zookeeper集群

  • 2、需要修改配置

    • $SPARK_HOME/conf/spark-env.sh

      • 1、注释掉手动指定的活着的master

        • #export SPARK_MASTER_HOST=node1

      • 2、引入zk配置

        export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER  -Dspark.deploy.zookeeper.url=node1:2181,node2:2181,node3:2181  -Dspark.deploy.zookeeper.dir=/spark"
  • 3、启动sparkHA

    • 先启动zookeeper

    • 在任意一台机器上启动 start-all.sh(需要配置任意2台机器之间的免登陆)

      • 当执行该脚本时候,它会在当前机器上产生一个master进程。

      • 在slaves文件中,获取对应worker节点,然后在指定的主机上启动worker进程

    • 可以在其他节点上单独启动master进程

      • start-master.sh

8、spark角色介绍

  • 1、driver

    • 就是运行main方法的进程,他会创建sparkContext

  • 2、application

    • 就是一个应用程序,它是包括driver的代码以及整个任务在计算时所需要的一些资源

  • 3、Master

    • 它是整个spark集群中的老大,它是负责任务的分配和资源调度

  • 4、Worker

    • 就是真正计算任务的节点

  • 5、Executor

    • 它是一个进程,会在worker节点启动运行任务

  • 6、Task

    • 它是spark集群中最小的工作单元,它是线程的方式运行在executor进程中

9、初识spark程序

  • 1、普通模式提交(指定当前集群中活着的master地址)

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master spark://node1:7077 \
    --executor-memory 1G \
    --total-executor-cores 2 \
    examples/jars/spark-examples_2.11-2.0.2.jar \
    100
  • 2、高可用模式提交

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master spark://node1:7077,node2:7077,node3:7077 \
    --executor-memory 1G \
    --total-executor-cores 2 \
    examples/jars/spark-examples_2.11-2.0.2.jar \
    100

10、spark-shell使用

  • 1、spark-shell --master local[2] 读取本地数据文件实现单词计数

    • local[N]

      • local表示本地运行,后面的数字N表示本地采用多少个线程去运行任务

    • local[*]

      • local表示本地运行,后面的* 表示使用当前机器上所有可用的资源

    • 他会产生一个SparkSubmit进程。

    • sc.textFile("file:///root/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
  • 2、spark-shell --master local[2] 读取HDFS数据文件实现单词计数

    sc.textFile("hdfs://node1:9000/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
    • spark整合HDFS

      • 需要修改spark-env.sh

        • export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop

        sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
  • 3、spark-shell --master spark://node1:7077 读取HDFS数据文件实现单词计数

    sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

11、利用scala编写spark的wordcount程序(本地运行)

  • 导入依赖

         <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>2.11.8</version>
      </dependency>
      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>2.0.2</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.7.4</version>
      </dependency>
  • 代码开发

package cn.itcast

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

//todo:利用scala实现spark的wordcount程序
object WordCount {
def main(args: Array[String]): Unit = {
    //1、创建sparkconf 设置appName和master地址   local[2] 表示本地使用2个线程运行
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
    //2、创建sparkcontext,所有计算的源头,它会创建DAGScheduler和TaskScheduler
    val sc = new SparkContext(sparkConf)
    //3、读取数据文件
    val data: RDD[String] = sc.textFile("D:\\words.txt")
    //4、切分每一行
    val words: RDD[String] = data.flatMap(_.split(" "))
    //5、每个单词计为1
    val wordAndOne: RDD[(String, Int)] = words.map((_,1))
    //6、相同单词出现次数累加
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
    //7、打印输出
    val finalResult: Array[(String, Int)] = result.collect()
    println(finalResult.toBuffer)
  //关闭sc
  sc.stop()

}
}

12、利用scala编写spark的wordcount程序(集群运行)

package cn.itcast

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

//todo:利用scala实现spark的wordcount程序(集群运行)
object WordCount_Online {
def main(args: Array[String]): Unit = {
  //1、创建sparkconf 设置appName
  val sparkConf: SparkConf = new SparkConf().setAppName("WordCount_Online")
  //2、创建sparkcontext,所有计算的源头,它会创建DAGScheduler和TaskScheduler
  val sc = new SparkContext(sparkConf)
  //3、读取数据文件
  val data: RDD[String] = sc.textFile(args(0))
  //4、切分每一行
  val words: RDD[String] = data.flatMap(_.split(" "))
  //5、每个单词计为1
  val wordAndOne: RDD[(String, Int)] = words.map((_,1))
  //6、相同单词出现次数累加
  val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
  //7、保存结果数据到HDFS上
    result.saveAsTextFile(args(1))

  //关闭sc
  sc.stop()

}
}
  • 提交脚本

spark-submit --class cn.itcast.WordCount_Online --master spark://node1:7077 --executor-memory 1g --total-executor-cores 2 original-spark_class04-2.0.jar /words.txt /2018

13、利用java编写spark的wordcount程序(本地运行)

package cn.itcast;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

//todo:利用java语言实现spark的wordcount程序
public class WordCount_Java {
  public static void main(String[] args) {
      //1、创建sparkconf对象
      SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]");
      //2、创建javaSparkContext
      JavaSparkContext jsc = new JavaSparkContext(sparkConf);
      //3、读取数据文件
      JavaRDD<String> dataJavaRDD = jsc.textFile("d:\\words.txt");
      //4、切分每一行
      JavaRDD<String> wordsJavaRDD = dataJavaRDD.flatMap(new FlatMapFunction<String, String>() {
          public Iterator<String> call(String line) throws Exception {
              String[] words = line.split(" ");
              return Arrays.asList(words).iterator();
          }
      });
      //5、每个单词计为1
      JavaPairRDD<String, Integer> wordAndOneJavaPairRDD = wordsJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
          public Tuple2<String, Integer> call(String word) throws Exception {
              return new Tuple2<String, Integer>(word, 1);
          }
      });
      //6、相同单词出现的次数累加
      JavaPairRDD<String, Integer> resultJavaPairRDD = wordAndOneJavaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
          public Integer call(Integer v1, Integer v2) throws Exception {
              return v1 + v2;
          }
      });
        //按照单词出现的次数降序排列 需要把(单词,次数) 位置颠倒 (次数,单词)
      JavaPairRDD<Integer, String> reverseJavaRDD = resultJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
          public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
              return new Tuple2<Integer, String>(t._2, t._1);
          }
      });

      //按照单词出现的次数降序排列 需要把(次数,单词)位置颠倒(单词,次数)
      JavaPairRDD<String, Integer> sortedRDD = reverseJavaRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
          public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
              return new Tuple2<String, Integer>(t._2, t._1);
          }
      });

      //7、打印结果
      List<Tuple2<String, Integer>> finalResult = sortedRDD.collect();
      for (Tuple2<String, Integer> tuple : finalResult) {
          System.out.println("单词:"+tuple._1+" 次数:"+tuple._2);
      }
      //8、关闭
      jsc.stop();

  }
}