PySpark笔记

时间:2022-12-10 21:17:18

spark源码位置:https://github.com/apache/spark

Spark Core核心RDD及编程

什么是RDD:
1.是一个抽象类不能直接使用,在子类中实现抽象方法是一个抽象类不能直接使用,在子类中实现抽象方法
2.带泛型的,可以支持多种类型:例如可以传入string,person,user
3.弹性分布式数据集,一个不可变的,可并行操作的元素分区集合

RDD都有五个主要特性:
1.-分区列表:一个RDD由多个分区(partition)构成
2.-计算每个分区(partition)的函数
3.-依赖于其他rdd的列表
4.-可选的,键值RDDs的分区器(例如,RDD是哈希分区的)
5.-可选的,计算每个分区的最佳位置列表(例如,位置块是一个HDFS文件)

面试常考:hashset和hashmap怎么实现的?涉及hashcode方法

Spark程序必须做的第一件事是创建一个SparkContext对象,
它告诉Spark如何访问集群。要创建SparkContext前需要先创建一个SparkConf对象。
Spark功能的主要入口点。SparkContext表示与Spark群集的连接,通过SparkContext来创建RDD和广播变量到该集群上

RDD常用算子编程详解
RDD有3类操作:Transformations算子,Action算子,Cache操作(缓存操作)

Spark中的所有转换都是惰性(lazy)的,不会立即计算结果。只记住应用于基本数据集(例如文件)的转换。只有当这个操作需要将结果返回到驱动程序时才会计算转换。这种设计使Spark能够更有效地运行。例如,我们可以意识到通过map创建的数据集将在reduce中使用,并且只将reduce的结果返回给驱动程序,而不是更大的映射数据集

Spark运行模式

在Spark中支持4中运行模式:
1.local模式:开发时使用,运行基本功能,然后提交到YARN生产上。
2.standalone模式:是Spark自带的,如果一个集群是standalone模式,需要在多台机器上同时部署Spark环境(不经常用)
3.YARN模式:建议生产使用YARN模式,统一使用YARN进行整个集群作业(MR/Spark)的资源调度
4.Mesos模式:国内用的不多

不管用什么模式,Spark应用代码是一样的,只需提交时通过--master参数来指定运行模式

启动Spark:在$SPARK_HOME/bin/目录下执行./spark-shell --master local[2] '因为spark需要访问hive驱动'

local运行模式:

$SPARK_HOME/bin目录中的Spark -submit脚本用于在集群上启动应用程序。它可以通过统一的接口使用Spark支持的所有集群管理器,因此您不必专门为每个集群管理器配置应用程序

local模式: 开发时经常使用
--master
--name
--py-files

执行方式:在$SPARK_HOME/bin/目录下./spark-submit --master local[2] --name spark-local /home/hadoop/script/spark0402.py(代码文件位置)

file:///home/hadoop/data/hello.txt(输入) file:///home/hadoop/wc/output(输出)

local模式:./spark-submit --master local[2] --name spark-local /home/hadoop/script/spark0402.py file:///home/hadoop/data/hello.txt file:///home/hadoop/wc/output

yarn模式:./spark-submit --master yarn --name spark-local /home/hadoop/script/spark0402.py file:///home/hadoop/data/hello.txt file:///home/hadoop/wc/output

standalone运行模式:

1.环境配置需要在$SPARK_HOME/conf目录下将slaves.template,拷贝为slaves后并添加主机名。

2.要在$SPARK_HOME/conf目录下的spark-env.sh中添加$JAVA_HOME和$PYTHON_HOME安装路径,否则会报错

3.启动spark集群:$SPARK_HOME/sbin/./start-all.sh

4.检测启动spark集群是否成功:输入命令jps:有Master和Worker 2个进程,就说明standalone模式安装成功

5.Web界面检测spark集群是否启动成功:http://hadoop000:8080,另外:spark://hadoop000:7077是spark在提交作业时指定的端口。

6.举例使用standalone模式,提交作业:$SPARK_HOME/bin目录下./pyspark --master spark://hadoop000:7077

7.standalone模式spark-submit提交运行:./spark-submit --master spark://hadoop000:7077 --name spark-standalone /home/hadoop/script/spark_test.py file:///home/hadoop/data/hello.txt file:///home/hadoop/wc/output

注意:1.如果使用standalone模式。2.节点个数大于1。3.使用本地文件进行测试,必须要保证每个节点上都有本地测试文件,因为是分布式运行会找不到测试文件。

8.针对于第7点可以使用hdfs进行测试:./spark-submit --master spark://hadoop000:7077 --name spark-standalone /home/hadoop/script/spark_test.py hdfs://hadoop000:8020/hello.txt hdfs://hadoop000:8020/wc/output

9.yarn模式:mapreduce运行在yarn上,spark on yarn(相同于用spark作为客户端,spark需要做的事情是提交作业到yarn上运行),该模式要启动HDFS和YARN。

yarn模式和standalone模式有什么区别?

yarn模式只需要一个节点,然后提交作业即可。不需要spark集群(不需要启动master和worker进程),如果是standalone模式,spark集群上的每个节点都需要部署spark,然后启动spark集群(需要master和worker进程)

注意:需要指定hadoop_conf_dir或者yarn_conf_dir的配置文件路径。在$SPARK_HOME/conf目录下的spark-env.sh添加$HADOOP_HOME/etc/hadoop配置文件路径。

例如:./spark-submit --master yarn --name spark-yarn /home/hadoop/script/spark_test.py hdfs://hadoop000:8020/hello.txt hdfs://hadoop000:8020/wc/output

10.yarn支持client和cluster模式,二者的区别?

主要是driver运行在什么地方,如果运行在client上,提交作业的进程不能停止否则作业就挂死了,如果运行在cluster上,提交完成作业后提交作业端就可以断开,因为driver运行在AM里面。

yarn默认运行在client上,如果要运行在cluster上

运行在client上:./pyspark --master yarn-client

pyspark/spark-shell是交互式运行程序,交互式运行程序启动以后可以输入代码,只能运行在client里面

如何查看已经运行完的yarn的日志信息?

$SPARK_HOME/bin目录下执行yarn logs -applicationId <applicationID>日志名称,日志位置在界面hadoop:8088中的applicationID

spark核心概念:https://spark.apache.org/docs/latest/cluster-overview.html-----》Glossary
词汇表

应用:建立在Spark上的用户程序。由集群上的驱动程序和执行程序组成
应用jar:包含用户的Spark应用程序的jar。在某些情况下,用户希望创建一个“uber jar”,其中包含他们的应用程序及其依赖项。用户的jar不包含Hadoop或Spark库,但是,这些库将在运行时添加
驱动程序:运行应用程序的main()函数并创建SparkContext的进程
集群管理器:用于获取集群上的资源的外部服务(例如,独立管理器、Mesos、YARN)
部署模式:区分驱动程序进程在何处运行。在“集群”模式下,框架在集群内部启动驱动程序。在“客户端”模式下,提交者在集群外部启动驱动程序
工作节点:可以在集群中运行应用程序代码的任何节点
执行者:为工作节点上的应用程序启动的进程,它运行任务并将数据保存在它们之间的内存或磁盘存储器中。每个应用程序都有自己的执行器
任务:将被发送给一个执行的工作单元
工作:由多个任务组成的并行计算,这些任务在Spark操作时生成(例如保存、收集);在驱动程序日志中看到这个术语
阶段:每个作业被划分为更小的任务子集,这些任务子集彼此依赖(类似于MapReduce中的map和reduce阶段);在驱动程序日志中看到这个术语

Spark和Hadoop重要概念区分:

Hadoop概念
1.一个MR程序=一个Job
2.一个Job=1个或N个Task(map/reduce)
3.1个task对应一个进程
4.task运行时开启进程,task执行完毕后销毁进程,对应多个task来说开销比较大的(即使你能通过JVM共享)
spark概念
1.一个应用=Driver(main方法中创建SparkContext)+Executors
2.一个应用=0个到多个Job
3.一个Job=一个Acton
4.一个Job=1到N个Stage
5.一个Stage=1到N个task
6.一个task对应一个线程,多个task可以以并行的方式运行在一个Excutors进程中

Spark Cache(缓存):
cache是一个lazy,遇到action才会执行
cache好处是:如果一个RDD在后续的计算中可能被使用到建议使用cache。缓存是通过BlockManager来完成

persist()或cache()区别:
cache底层调用的是persist方法,传入的参数是:StorageLevel.MEMORY_ONLY,所以,cache=persist,cache没有参数

Spark窄依赖和宽依赖:窄依赖定义:一个父RDD的分区(partition)最多被子RDD的某个分区(partition)使用一次,流水线作业。包含的算子有:filter map flatmap mapPartitions
宽依赖定义:一个父RDD的分区(partition)会被子RDD的分区(partition)使用多次,有shuffle。包含的算子有:reduceByKey grupByKey combineByKey

historyserver:
$SPARK_HOME/conf/spark-default.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode:端口号

Flume是一个分布式日志采集传输系统:
event(事件)是Flume的基本数据单位由消息头和消息体组成。Flume运行的核心是Agent

Spark SQL,PySpark不能操作DataSet

Hive:on MapReducce
SQL翻译成MapReducce提交到Hadoop Cluster运行

metastores相当于一个表的元数据信息

spark核心版本出现核心功能,例如:spark1.6出现Dataset,1.3出现DataFrames,Dateset不支持python原因是python是动态语言
DataFrames:是以列(列名,列类型,列值)的形式构成分布式数据集

面试题:RDD与DataFrame的区别:

Spark Streaming:
是spark streaming是spark API的扩展,streaming是什么?IO操作有输入有输出。输入:数据源头(例如:Kafka、Flume、Kinesis或TCP套接字等多个源获取);输出:数据的流向(例如:文件系统、数据库)。

问:安装完spark后能否直接使用spark streaming?答:不需要搭建完spark后可直接使用

常用实时流处理框架

spark streaming:不是真正的实时流处理框架,而是一个mini bach操作,使用spark栈一站式批处理解决问题。底层以批处理为主,以流处理为辅。不需要搭建集群
storm:真正的实时流处理 Tuple 用java写的需要搭建集群
flink:与spark streaming相反
kafka storm:国内用的很少

SparkStreaming执行原理:Spark流接收实时输入数据流,并将数据流拆分为很小的数据批次,然后数据批次由Spark引擎处理,生成最终的批次结果流。

sparkCore的核心抽象是RDD,对应的5大特性,对应源码中的5个方法是什么?
sparkstreaming的核心抽象是DStream,DStream是连续的数据流

面试点:sparkcore的存储策略和sparkstreaming的存储策略区别?

Azkaban(工作流调度器):

官网地址:https://azkaban.readthedocs.io/en/latest/getStarted.html#getting-started-with-the-solo-server
Spark SQL/Hadoop用于做离线统计处理
有一个典型的ETL操作:该操作步骤
1.数据抽取:可以使用Sqoop把RDBMS关系型数据库中的数据抽取到Hadoop,如果数据是在其他平台以文本的方式存储可以使用Flume进行日志,文本数据的采集到Hadoop
2.数据处理,可以采用Hive/MapReducce/Spark/...不同的框架实现
3.统计结果入库:
a)数据存放到HDFS(Hive/Spark SQL表/文件)上,启动一个server:在Hive里面叫Server2,在Spark里面叫ThriftServer,通过JDBC方式操作统计结果。
b)使用sqoop框架把结果导出RDBMS中

简单的任务调度:直接使用linux的crontab来定义,crontab+shell,优点是简单,易用。缺点是维护繁琐,依赖关系强

Azkaban架构包括3个核心组件:
关系型数据库(MySQL),AzkabanWeb服务(server),Azkaban执行服务(ExccutorServer)

Azkaban运行模式:

有2种模式:solo-server模式,分布式多执行器模式

Azkaban源码编译:
1.去github上下载源码包
2../gradlew build installDist
3.先下载gradle-4.6-all.zip,然后整合到azkaban源码中,避免在编译过程中去网络下载,将该文件拷贝到/azkaban-3.57.0/gradle/wrapper目录下,然后修改gradle-wrapper.properties配置文件distributionUrl=gradle-4.6-all.zip即可。

4.编译成功后,去对应的目录下找对应模式的安装包即可

5.Azkaban solo server环境部署及快速入门:

启动azkaban-solo-server在bin目录下执行start-solo.sh,通过jps查看有AzkabanSingleServer

HDFS作业在Azkaban中的使用

使用hadoop fs -mkdir /azkban创建文件夹

创建一个 vi hdfs.job

内容:

type=command

command=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/bin/hadoop fs -ls /

打成zip包:zip -r hdfs.zip hdfs.job,然后在azkaban进行提交

Hive作业在Azkaban中的使用

启动hive后创建表

加载数据到hive表:load data local inpath '/home/hadoop/data/emp.txt' overwrite into table emp

创建一个 vi hive.job

内容:

type=command

command=/home/hadoop/app/hive-1.1.0-cdh5.7.0/bin/hive -f ‘test.sql’

Spark Local模式搭建 :在bin目录下执行spark-shell --master local[2],后会有scala命令提示符
Spark Standalone模式环境搭建:1个maste+n个worker在spark-env.sh的配置
1.SPARK_MASTER_HOST=主机名
2.SPARK_WORKER_CORES=2,一个从节点或计算节点分2个core给spark使用    ##WorkNode分出几核给spark使用
3.SPARK_WORKER_MEMORY=2g,一共2个core和2个内存   ##WorkNode分出多少内存给spark使用
4.SPARK_WORKER_INSTANCES=1,worker当中启动几个实例,启动个实例     ##WorkNode使用几个spark实例,一般一个就行了

spark SQL
thriftserver&beeline的使用
1.启动thriftserver后在主机名+端口号,hadoop000:10000 可以看到多出来SQL好JDBC/ODBC Server,默认端口是10000可以修改
2.启动beeline,通过beeline -u jdbc:hive2://localhost:10000 -n(是当前机器的用户名)
3.修改thriftserver端口
./start-thriftserver.sh --master local[2] --jar ~/software/mysql-connector-java-5.1.27-bin.jar --hiveconf .server2.thrift.port=14000

spark-shell

在hive中的数据表如何通过sparksql进行访问?
需要启动spark-shell进行访问
spark-shell master local[2]
启动后在scala命令提示符输入:spark.sql("show tables").show,没有进行配置不会存在数据,如果要访问hive里面的数据需要2项配置
1.hive-site.xml的配置,需要将hive目录下conf文件夹下的hive-site.xml复制到spark安装目录下的conf文件夹下,会出错将msql的jar包传到classpath就可以
2../spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27-bin.jar
会出现版本检测在hive-site.xml添加验证为false

spark-sql执行方式:./spark-sql --master local[2] --jars ~/software/mysql-connector-java-5.1.27-bin.jar,语法和sql一样,访问hadoop000:4040和spark-shell一样

thriftserver和sparksql,spark-shell有什么区别
1.spark-shell或sparksql都是一个spark application,每次都需要重新申请资源
2.thriftserver不管启动多少个客户端(beeline/core)只要是连到一个server上永远是一个spark application。同时解决 数据共享数据的问题。

通过JDBC的方式访问
1.Driver
2.getConnection
3.执行sql语句

注意事项:在啊使用jdbc开发是一定要先启动thriftserver 否则会报连接异常的错

DataFrame在SparkCore里面是以RDD方式进行编程的,在SparkSQL里面是以DataFrame或DataSet方式进行编程

schema是什么?数据表的结构信息

A Dataset is a distributed collection of data:数据集是数据的分布式集合
A DataFrame is a Dataset organized into named columns:DataFrame以列的形式构成的分布式数据集,按照列赋予不同的名称,在概念上等价于关系数据库中的表或R/Python中的数据帧

DataFrame和RDD对比
RDD如果用java或scala语言开发底层运行在jvm上,如果是python上有python的运行环境
DataFrame无论是用java/scala/python开发都会转成logic plan

DataFrame与RDD互操作
1.使用反射推断模式:

前提是事先知道的字段,字段类型等信息

2.以编程方式指定模式:

如果反射不能满足编程要求(事先不知道列等信息)

sparkSQL的外部数据源API如何操作hive表里面的数据
1.读取数据spark.table(tableName)
2.将数据写回去df.write.saveAsTable(tablName)

PySpark笔记的更多相关文章

  1. Pyspark笔记一

    1. pyspark读csv文件后无法显示中文 #pyspark读取csv格式时,不能显示中文 df = spark.read.csv(r"hdfs://mymaster:8020/user ...

  2. Spark调研笔记第4篇 - PySpark Internals

    事实上.有两个名为PySpark的概念.一个是指Sparkclient内置的pyspark脚本.而还有一个是指Spark Python API中的名为pyspark的package. 本文仅仅对第1个 ...

  3. pyspark学习笔记

    记录一些pyspark常用的用法,用到的就会加进来 pyspark指定分区个数 通过spark指定最终存储文件的个数,以解决例如小文件的问题,比hive方便,直观 有两种方法,repartition, ...

  4. pyspark 学习笔记

    from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark S ...

  5. 【原】Learning Spark &lpar;Python版&rpar; 学习笔记&lpar;三&rpar;----工作原理、调优与Spark SQL

    周末的任务是更新Learning Spark系列第三篇,以为自己写不完了,但为了改正拖延症,还是得完成给自己定的任务啊 = =.这三章主要讲Spark的运行过程(本地+集群),性能调优以及Spark ...

  6. Spark&period;ML之PipeLine学习笔记

    地址: http://spark.apache.org/docs/2.0.0/ml-pipeline.html   Spark PipeLine 是基于DataFrames的高层的API,可以方便用户 ...

  7. Spark调研笔记第2篇 - 怎样通过Sparkclient向Spark提交任务

    在上篇笔记的基础上,本文介绍Sparkclient的基本配置及Spark任务提交方式. 1. Sparkclient及基本配置 从Spark官网下载的pre-built包中集成了Sparkclient ...

  8. Spark 基本函数学习笔记一

      Spark 基本函数学习笔记一¶ spark的函数主要分两类,Transformations和Actions. Transformations为一些数据转换类函数,actions为一些行动类函数: ...

  9. 大数据学习——spark笔记

    变量的定义 val a: Int = 1 var b = 2 方法和函数 区别:函数可以作为参数传递给方法 方法: def test(arg: Int): Int=>Int ={ 方法体 } v ...

随机推荐

  1. QT QToolBox类

    QToolBox类的创建 //drawer.h #ifndef DRAWER_H #define DRAWER_H #include <QToolBox> #include <QTo ...

  2. ASP&period;NET Web Api 实现数据的分页(转载)

    转载地址:http://www.cnblogs.com/fzrain/p/3542608.html 前言 这篇文章我们将使用不同的方式实现手动分页(关于高端大气上档次的OData本文暂不涉及,但有可能 ...

  3. Node&period;js 随记

    http://nodejs.org/  下载并安装 node.js 最新版本 运行cmd,输入npm install xxxxxx 回车,安装扩展模块,如:express,socket.io等 运行c ...

  4. python语言 第一个程序

    print("hello word!") # for i in range(1, 10):# for j in range(1, 10):# print(j, "x&qu ...

  5. nginx 前端POST请求405问题解决与排查过程

    问题描述 在请求时,f12提示POST请求报错405 环境描述 nginx转发至后端nginx,后端nginx转发至后端golang api接口 解决步骤 根据网上方法排查,发现80%以上无非就是以下 ...

  6. CentOS7为php7&period;2安装php-redis扩展

    先下载phpredis-develop cd /tmpwget https://codeload.github.com/phpredis/phpredis/zip/develop 安装unzip.zi ...

  7. Java的语法糖

    1.前言 本文记录内容来自<深入理解Java虚拟机>的第十章早期(编译期)优化其中一节内容,其他的内容个人觉得暂时不需要过多关注,比如语法.词法分析,语义分析和字节码生成的过程等.主要关注 ...

  8. linux学习笔记-conky配置开机启动方法

    我的邮箱地址:zytrenren@163.com欢迎大家交流学习纠错! 一.常用桌面的配置方法 创建启动文件并加入以下配置 ~/.config/autostart/conky.desktop [Des ...

  9. keycloak docker-compose 运行

    内容很简单,主要是搭建一个可运行的keycloak 环境,方便开发测试,同时支持数据库的持久化 docker-compose 文件 version: "3" services: a ...

  10. 20155315庄艺霖第三次作业之Linux初体验

    Linux初体验 安装Linux三两事 老师的作业要求基于VirtualBox安装Linux系统,我一开始下载了VB但是电脑运行不了,后来看网上的教程下载了VMware,才算开始了我的Linux之旅. ...