日期 |
版本 |
修订 |
审批 |
修订说明 |
2016.10.11 |
1.0 |
章鑫8 |
|
初始版本 |
|
|
|
|
|
1 简介
Spark是UC Berkeley AMP lab(加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark拥有Hadoop MapReduce所具有的有点,但不同于MapReduce的是job中间输出结果是可以保存在内存中的,从而不再需要读写hdfs,很大程度上提高了计算速度,因此Spark能够更好的适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
Spark使用Scala语言进行实现,它是一种面向对象、函数式编程语言,能够像操作本地集合对象一样轻松地操作分布式数据集。在Spark官网上介绍,它具有运行速度快、易用性好、通用性好、通用性强和随处运行等特点。
2 架构原理
Spark只是一个计算框架,它本身并不包括对数据的存储,这一点与实时分布式系统
Storm是一样的。Spark是一个可以基于hadoop的hdfs,也可以基于hbase之上的处理框架,图1是spark的框架(最新的应该还有GraphX模块):
图1 Spark框架图
2.1 适用场景
目前的大数据处理场景一般有一下几种类型:
1、复杂的批量处理(batch data processing),偏重点在于海量数据的处理能力,至于速度,不太关注,通常时间可能是在数十分钟到数小时等
2、基于历史数据的交互式查询(interactive query),通常的时间在数十秒到数十分钟之间
3、基于实时数据流的数据处理(Streaming dataprocessing),通常在数百毫秒到数秒之间
目前对上述三种场景需求,都有比较成熟的处理框架,第一种可以用hadoop的mapreduce来进行批量海量数据处理,而impala可以进行交互式查询,对于实时流式数据处理,有比较成熟的storm分布式处理框架。但三者都是比较独立,各自维护一套成本比较高,而Spark的出现,正好统一解决了上述三种需求。
简单总结,Spark有一下几种使用场景:
1、 Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小
2、 由于RDD的特性,Spark不适用于那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合
3、 数据量不是特别大,但又有统计分析的需要,又有实时需求
4、 总的来说Spark的适用面比较广泛且比较通用
2.2 使用特点
1、 运行速度快
Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计算。官方提供的数据表明,
如果数据由磁盘读取,速度是HadoopMapReduce的10倍以上,如果数据从内存中读取,速度可以高达100多倍,如图2所示。
图2 hadoop与spark速度对比
2、 易用性好
Spark不仅支持Scala编写应用程序,而且支持java和Python等语言进行编写,特别是
Scala是一种高效、可拓展的语言,能够用简洁的代码处理较为复杂的处理工作。
3、 通用性强
Spark生态圈即BDAS(伯克利数据分析栈)包含了SparkCore、Spark SQL、SparkStreaming、MLLib和GraphX等组件,其中Spark Core提供内存计算框架、SparkSteaming用来处理实时处理应用、Spark SQL用来即时查询、MLLib或MLbase用于机器学习以及GraphX的图处理,它们都是由AMP实验室提供,能够无缝的集成并提供一站式解决平台,Spark生态圈如图3所示。
图3 Spark生态圈
4、 随处运行
Spark具有很强的适应性,能够读取HDFS、Cassandra、Hbase、S3和Techyon为持久层
读写原生数据,能够以Mesos、YARN和自身携带的Standalone作为资源管理器调度job,来完成Spark应用程序的计算,具体如图4所示。
图4 Spark运行环境
2.3 Spark与Hadoop的差异
Spark是在借鉴了MapReduce之上发展而来的,继承了其分布式并行计算的优点并改进了MapReduce明显的缺陷,具体如下:
首先,Spark把中间数据放到内存中,迭代运算效率高。MapReduce中计算结果需要落地,保存到磁盘上,这样势必会影响整体速度,而Spark支持DAG图的分布式并行计算的编程框架,减少了迭代过程中数据的落地,提高了处理效率。
其次,Spark容错性高。Spark引进了弹性分布式数据集RDD (ResilientDistributed Dataset)的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,则可以根据“血统”(即充许基于数据衍生过程)对它们进行重建。另外在RDD计算时可以通过CheckPoint来实现容错,而CheckPoint有两种方式:CheckPoint Data,和Logging The Updates,用户可以控制采用哪种方式来实现容错。
最后,Spark更加通用。不像Hadoop只提供了Map和Reduce两种操作,Spark提供的数据集操作类型有很多种,大致分为:Transformations和Actions两大类。Transformations包括Map、Filter、FlatMap、Sample、GroupByKey、ReduceByKey、Union、Join、Cogroup、MapValues、Sort和PartionBy等多种操作类型,同时还提供Count, Actions包括Collect、Reduce、Lookup和Save等操作。
另外各个处理节点之间的通信模型不再像Hadoop只有Shuffle一种模式,用户可以命名、物化,控制中间结果的存储、分区等。
2.4 技术术语
1、 运行模式
运行环境 |
模式 |
描述 |
Local |
本地模式 |
常用于本地开发测试,本地还分为local单线程和local-cluster多线程; |
Standalone |
集群模式 |
典型的Mater/slave模式,不过也能看出Master是有单点故障的;Spark支持 ZooKeeper来实现HA |
On yarn |
集群模式 |
运行在yarn资源管理器框架之上,由yarn负责资源管理,Spark负责任务调度和计算 |
On mesos |
集群模式 |
运行在mesos资源管理器框架之上,由mesos负责资源管理,Spark负责任务调度和计算 |
On cloud |
集群模式 |
比如AWS的EC2,使用这个模式能很方便的访问Amazon的S3; Spark支持多种分布式存储系统:HDFS和S3 |
2、 常用术语
术语 |
描述 |
Application |
Spark的应用程序,包含一个Driver program和若干Executor |
SparkContext |
Spark应用程序的入口,负责调度各个运算资源,协调各个Worker Node上的Executor |
Driver Program |
运行Application的main()函数并且创建SparkContext |
Executor |
是为Application运行在Worker node上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上。 每个Application都会申请各自的Executor来处理任务 |
Cluster Manager |
在集群上获取资源的外部服务 (例如:Standalone、Mesos、Yarn) |
Worker Node |
集群中任何可以运行Application代码的节点,运行一个或多个Executor进程 |
Task |
运行在Executor上的工作单元 |
Job |
SparkContext提交的具体Action操作,常和Action对应 |
Stage |
每个Job会被拆分很多组task,每组任务被称为Stage,也称TaskSet |
RDD |
是Resilient distributed datasets的简称,中文为弹性分布式数据集;是Spark最核心的模块和类 |
DAGScheduler |
根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler |
TaskScheduler |
将Taskset提交给Worker node集群运行并返回结果 |
Transformations |
是Spark API的一种类型,Transformation返回值还是一个RDD, 所有的Transformation采用的都是懒策略,如果只是将Transformation提交是不会执行计算的 |
Action |
是Spark API的一种类型,Action返回值不是一个RDD,而是一个scala集合;计算只有在Action被提交的时候计算才被触发。 |
2.5 模块介绍
2.5.1 Spark Core
前面介绍了Spark Core的基本情况,以下总结一下Spark内核架构:
Ø 提供了有向无环图(DAG)的分布式并行计算框架,并提供Cache机制来支持多次迭代计算或者数据共享,大大减少迭代计算之间读取数据局的开销,这对于需要进行多次迭代的数据挖掘和分析性能有很大提升
Ø 在Spark中引入了RDD (Resilient Distributed Dataset) 的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,则可以根据“血统”对它们进行重建,保证了数据的高容错性
Ø 移动计算而非移动数据,RDD Partition可以就近读取分布式文件系统中的数据块到各个节点内存中进行计算
Ø 使用多线程池模型来减少task启动开稍
Ø 采用容错的、高可伸缩性的akka作为通讯框架
2.5.2 Spark Streaming
Spark Streaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kafka、Flume、Twitter、Zero和TCP 套接字)进行类似Map、Reduce和Join等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。
Spark Streaming构架:
计算流程:Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(DiscretizedStream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加或者存储到外部设备。下图显示了Spark Streaming的整个流程。
图5 Spark Streaming构架
容错性:对于流式计算来说,容错性至关重要。首先我们要明确一下Spark中RDD的容错机制。每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系(lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的。
对于Spark Streaming来说,其RDD的传承关系如下图所示,图中的每一个椭圆形表示一个RDD,椭圆形中的每个圆形代表一个RDD中的一个Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每一个Batch Size所产生的中间结果RDD。我们可以看到图中的每一个RDD都是通过lineage相连接的,由于Spark Streaming输入数据可以来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每一个数据流拷贝两份到其他的机器)都能保证容错性,所以RDD中任意的Partition出错,都可以并行地在其他机器上将缺失的Partition计算出来。这个容错恢复方式比连续计算模型(如Storm)的效率更高。
图6 Spark Streaming中RDD的lineage关系图
实时性:对于实时性的讨论,会牵涉到流式处理框架的应用场景。SparkStreaming将流式计算分解成多个SparkJob,对于每一段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),所以Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。
扩展性与吞吐量:Spark目前在EC2上已能够线性扩展到100个节点(每个节点4Core),可以以数秒的延迟处理6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm高2~5倍,图7是Berkeley利用WordCount和Grep两个用例所做的测试,在Grep这个测试中,SparkStreaming中的每个节点的吞吐量是670krecords/s,而Storm是115k records/s。
图7 Spark Streaming与Storm吞吐量比较图
2.5.3 Spark SQL
Shark是SparkSQL的前身,它发布于3年前,那个时候Hive可以说是SQL on Hadoop的唯一选择,负责将SQL编译成可扩展的MapReduce作业,鉴于Hive的性能以及与Spark的兼容,Shark项目由此而生。
Shark即Hive on Spark,本质上是通过Hive的HQL解析,把HQL翻译成Spark上的RDD操作,然后通过Hive的metadata获取数据库里的表信息,实际HDFS上的数据和文件,会由Shark获取并放到Spark上运算。Shark的最大特性就是快和与Hive的完全兼容,且可以在shell模式下使用rdd2sql()这样的API,把HQL得到的结果集,继续在scala环境下运算,支持自己编写简单的机器学习或简单分析处理函数,对HQL结果进一步分析计算。
在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。Databricks表示,Spark SQL将涵盖Shark的所有特性,用户可以从Shark 0.9进行无缝的升级。在会议上,Databricks表示,Shark更多是对Hive的改造,替换了Hive的物理执行引擎,因此会有一个很快的速度。然而,不容忽视的是,Shark继承了大量的Hive代码,因此给优化和维护带来了大量的麻烦。随着性能优化和先进分析整合的进一步加深,基于MapReduce设计的部分无疑成为了整个项目的瓶颈。因此,为了更好的发展,给用户提供一个更好的体验,Databricks宣布终止Shark项目,从而将更多的精力放到SparkSQL上。
Spark SQL允许开发人员直接处理RDD,同时也可查询例如在Apache Hive上存在的外部数据。SparkSQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询,同时进行更复杂的数据分析。除了Spark SQL外,Michael还谈到Catalyst优化框架,它允许Spark SQL自动修改查询方案,使SQL更有效地执行。
还有Shark的作者是来自中国的博士生辛湜(Reynold Xin),也是Spark的核心成员,具体信息可以看他的专访http://www.****.net/article/2013-04-26/2815057-Spark-Reynold。
Spark SQL的特点:
Ø 引入了新的RDD类型SchemaRDD,可以象传统数据库定义表一样来定义SchemaRDD,SchemaRDD由定义了列数据类型的行对象构成。SchemaRDD可以从RDD转换过来,也可以从Parquet文件读入,也可以使用HiveQL从Hive中获取
Ø 内嵌了Catalyst查询优化框架,在把SQL解析成逻辑执行计划之后,利用Catalyst包里的一些类和接口,执行了一些简单的执行计划优化,最后变成RDD的计算
Ø 在应用程序中可以混合使用不同来源的数据,如可以将来自HiveQL的数据和来自SQL的数据进行Join操作
图8 hive和shark架构图
Shark的出现使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高,那么,摆脱了Hive的限制,SparkSQL的性能又有怎么样的表现呢?虽然没有Shark相对于Hive那样瞩目地性能提升,但也表现得非常优异,如图9所示:
图9 SparkSQL性能
为什么sparkSQL的性能会得到怎么大的提升呢?主要sparkSQL在下面几点做了优化:
1、内存列存储(In-Memory Columnar Storage) sparkSQL的表数据在内存中存储不是采用原生态的JVM对象存储方式,而是采用内存列存储;
2、字节码生成技术(Bytecode Generation) Spark1.1.0在Catalyst模块的expressions增加了codegen模块,使用动态字节码生成技术,对匹配的表达式采用特定的代码动态编译。另外对SQL表达式都作了CG优化, CG优化的实现主要还是依靠Scala2.10的运行时放射机制(runtime reflection);
3、Scala代码优化 SparkSQL在使用Scala编写代码的时候,尽量避免低效的、容易GC的代码;尽管增加了编写代码的难度,但对于用户来说接口统一。
2.5.4 BlinkDB
BlinkDB 是一个用于在海量数据上运行交互式 SQL 查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度被控制在允许的误差范围内。为了达到这个目标,BlinkDB 使用两个核心思想:
Ø 一个自适应优化框架,从原始数据随着时间的推移建立并维护一组多维样本;
Ø 一个动态样本选择策略,选择一个适当大小的示例基于查询的准确性和(或)响应时间需求。
和传统关系型数据库不同,BlinkDB是一个很有意思的交互式查询系统,就像一个跷跷板,用户需要在查询精度和查询时间上做一权衡;如果用户想更快地获取查询结果,那么将牺牲查询结果的精度;同样的,用户如果想获取更高精度的查询结果,就需要牺牲查询响应时间。用户可以在查询的时候定义一个失误边界,BlinkDB架构如图10所示。
图10 BlinkDB架构
2.5.5 MLBase/MLlib
MLBase是Spark生态圈的一部分专注于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用MLBase。MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。
Ø ML Optimizer会选择它认为最适合的已经在内部实现好了的机器学习算法和相关参数,来处理用户输入的数据,并返回模型或别的帮助分析的结果;
Ø MLI 是一个进行特征抽取和高级ML编程抽象的算法实现的API或平台;
Ø MLlib是Spark实现一些常见的机器学习算法和实用程序,包括分类、回归、聚类、协同过滤、降维以及底层优化,该算法可以进行可扩充;
Ø MLRuntime 基于Spark计算框架,将Spark的分布式计算应用到机器学习领域。
图11 SparkMLLib结构图
总的来说,MLBase的核心是它的优化器,把声明式的Task转化成复杂的学习计划,产出最优的模型和计算结果。与其他机器学习Weka和Mahout不同的是:
Ø MLBase是分布式的,Weka是一个单机的系统
Ø MLBase是自动化的,Weka和Mahout都需要使用者具备机器学习技能,来选择自己想要的算法和参数来做处理
Ø MLBase提供了不同抽象程度的接口,让算法可以扩充
Ø MLBase基于Spark这个平台
2.5.6 GraphX
GraphX是Spark中用于图(e.g.,Web-Graphs and Social Networks)和图并行计算(e.g., PageRank and Collaborative Filtering)的API,可以认为是GraphLab(C++)和Pregel(C++)在Spark(Scala)上的重写及优化,跟其他分布式图计算框架相比,GraphX最大的贡献是,在Spark之上提供一站式数据解决方案,可以方便且高效地完成图计算的一整套流水作业。GraphX最先是伯克利AMPLAB的一个分布式图计算框架项目,后来整合到Spark中成为一个核心组件。
GraphX的核心抽象是Resilient Distributed Property Graph,一种点和边都带属性的有向多重图。它扩展了Spark RDD的抽象,有Table和Graph两种视图,而只需要一份物理存储。两种视图都有自己独有的操作符,从而获得了灵活操作和执行效率。如同Spark,GraphX的代码非常简洁。GraphX的核心代码只有3千多行,而在此之上实现的Pregel模型,只要短短的20多行。GraphX的代码结构整体下图所示,其中大部分的实现,都是围绕Partition的优化进行的。这在某种程度上说明了点分割的存储和相应的计算优化的确是图计算框架的重点和难点。
图12 GraphX结构
GraphX的底层设计有以下几个关键点:
1、对Graph视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成。这样对一个图的计算,最终在逻辑上,等价于一系列RDD的转换过程。因此,Graph最终具备了RDD的3个关键特性:Immutable、Distributed和Fault-Tolerant。其中最关键的是Immutable(不变性)。逻辑上,所有图的转换和操作都产生了一个新图;物理上,GraphX会有一定程度的不变顶点和边的复用优化,对用户透明。
2、两种视图底层共用的物理数据,由RDD[Vertex-Partition]和RDD[EdgePartition]这两个RDD组成。点和边实际都不是以表Collection[tuple]的形式存储的,而是由VertexPartition/EdgePartition在内部存储一个带索引结构的分片数据块,以加速不同视图下的遍历速度。不变的索引结构在RDD转换过程中是共用的,降低了计算和存储开销。
3、图的分布式存储采用点分割模式,而且使用partitionBy方法,由用户指定不同的划分策略(PartitionStrategy)。划分策略会将边分配到各个EdgePartition,顶点Master分配到各个VertexPartition,EdgePartition也会缓存本地边关联点的Ghost副本。划分策略的不同会影响到所需要缓存的Ghost副本数量,以及每个EdgePartition分配的边的均衡程度,需要根据图的结构特征选取最佳策略。目前有EdgePartition2d、EdgePartition1d、RandomVertexCut和CanonicalRandomVertexCut这四种策略。
2.5.7 SparkR
SparkR是AMPLab发布的一个R开发包,使得R摆脱单机运行的命运,可以作为Spark的job运行在集群上,极大得扩展了R的数据处理能力。
SparkR的几个特性:
Ø 提供了Spark中弹性分布式数据集(RDD)的API,用户可以在集群上通过R shell交互性的运行Spark job
Ø 支持序化闭包功能,可以将用户定义函数中所引用到的变量自动序化发送到集群中
其他的机器上
Ø SparkR还可以很容易地调用R开发包,只需要在集群上执行操作前用includePackage读取R开发包就可以了,当然集群上要安装R开发包
图13 SparkR示意图
2.5.8 Tachyon
Tachyon是一个高容错的分布式文件系统,允许文件以内存的速度在集群框架中进行可靠的共享,就像Spark和 MapReduce那样。通过利用信息继承,内存侵入,Tachyon获得了高性能。Tachyon工作集文件缓存在内存中,并且让不同的 Jobs/Queries以及框架都能内存的速度来访问缓存文件。因此,Tachyon可以减少那些需要经常使用的数据集通过访问磁盘来获得的次数。Tachyon兼容Hadoop,现有的Spark和MR程序不需要任何修改而运行。
在2013年4月,AMPLab共享了其Tachyon 0.2.0 Alpha版本的Tachyon,其宣称性能为HDFS的300倍,继而受到了极大的关注。Tachyon的几个特性如下:
Ø JAVA-Like File API
Tachyon提供类似JAVA File类的API,
Ø 兼容性
Tachyon实现了HDFS接口,所以Spark和MR程序不需要任何修改即可运行。
Ø 可插拔的底层文件系统
Tachyon是一个可插拔的底层文件系统,提供容错功能。Tachyon将内存数据记录在底层文件系统。它有一个通用的接口,使得可以很容易的插入到不同的底层文件系统。目前支持HDFS,S3,GlusterFS和单节点的本地文件系统,以后将支持更多的文件系统。
3 运行架构
3.1 编程模型
3.1.1 技术术语
Ø Application
Spark Application的概念和Hadoop MapReduce中的类似,指的是用户编写的Spark应用程序,包含了一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码;
Ø Driver
Spark中的Driver即运行上述Application的main()函数并且创建SparkContext,其中创建SparkContext的目的是为了准备Spark应用程序的运行环境。在Spark中由SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配和监控等;当Executor部分运行完毕后,Driver负责将SparkContext关闭。通常用SparkContext代表Driver;
Ø Executor
Application运行在Worker 节点上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个Application都有各自独立的一批Executor。在Spark on Yarn模式下,其进程名称为CoarseGrainedExecutorBackend,类似于Hadoop MapReduce中的YarnChild。一个CoarseGrainedExecutorBackend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task。每个CoarseGrainedExecutorBackend能并行运行Task的数量就取决于分配给它的CPU的个数了;
Ø Cluster Manager
指的是在集群上获取资源的外部服务,目前有:
Standalone:Spark原生的资源管理,由Master负责资源的分配;
Hadoop Yarn:由YARN中的ResourceManager负责资源的分配;
Ø Worker
集群中任何可以运行Application代码的节点,类似于YARN中的NodeManager节点。在Standalone模式中指的就是通过Slave文件配置的Worker节点,在Spark on Yarn模式中指的就是NodeManager节点;
Ø 作业(Job)
包含多个Task组成的并行计算,往往由Spark Action催生,一个JOB包含多个RDD及作用于相应RDD上的各种Operation;
Ø 阶段(Stage)
每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段;
Ø 任务(Task)
被送到某个Executor上的工作任务;
上述术语涉及到的实例图如图14所示:
图14 Spark运行原理图
1.1.1 模型组成
Spark应用程序可分两部分:Driver部分和Executor部分,如图15所示。
图15 模型组成图
1、Driver部分
Driver部分主要是对SparkContext进行配置、初始化以及关闭。初始化SparkContext是为了构建Spark应用程序的运行环境,在初始化SparkContext,要先导入一些Spark的类和隐式转换;在Executor部分运行完毕后,需要将SparkContext关闭。
2、Executor部分
Spark应用程序的Executor部分是对数据的处理,数据分三种:
a)原生数据
包含原生的输入数据和输出数据。
Ø 输入数据
对于输入原生数据,Spark目前提供了两种:
² Scala集合数据集:如Array(1,2,3,4,5),Spark使用parallelize方法转换成RDD
² Hadoop数据集:Spark支持存储在hadoop上的文件和hadoop支持的其他文件系统,如本地文件、HBase、SequenceFile和Hadoop的输入格式。例如Spark使用txtFile方法可以将本地文件或HDFS文件转换成RDD
Ø 输出数据
对于输出数据,Spark除了支持以上两种数据,还支持scala标量。
² 生成Scala标量数据,如count(返回RDD中元素的个数)、reduce、fold/aggregate;返回几个标量,如take(返回前几个元素)
² 生成Scala集合数据集,如collect(把RDD中的所有元素倒入 Scala集合类型)、lookup(查找对应key的所有值)
² 生成hadoop数据集,如saveAsTextFile、saveAsSequenceFile
b)RDD
RDD具体在下一节中详细描述,RDD提供了四种算子:
Ø 输入算子:将原生数据转换成RDD,如parallelize、txtFile等
Ø 转换算子:最主要的算子,是Spark生成DAG图的对象,转换算子并不立即执行,在触发行动算子后再提交给driver处理,生成DAG图 --> Stage --> Task --> Worker执行。
Ø 缓存算子:对于要多次使用的RDD,可以缓冲加快运行速度,对重要数据可以采用多备份缓存。
Ø 行动算子:将运算结果RDD转换成原生数据,如count、reduce、collect、saveAsTextFile等。
c)共享变量
在Spark运行时,一个函数传递给RDD内的patition操作时,该函数所用到的变量在每个运算节点上都复制并维护了一份,并且各个节点之间不会相互影响。但是在Spark Application中,可能需要共享一些变量,提供Task或驱动程序使用。Spark提供了两种共享变量:
Ø 广播变量(Broadcast Variables)
可以缓存到各个节点的共享变量,通常为只读
– 广播变量缓存到各个节点的内存中,而不是每个 Task
– 广播变量被创建后,能在集群中运行的任何函数调用
– 广播变量是只读的,不能在被广播后修改
– 对于大数据集的广播, Spark 尝试使用高效的广播算法来降低通信成本
使用方法:
valbroadcastVar = sc.broadcast(Array(1, 2, 3))
Ø 累计器
只支持加法操作的变量,可以实现计数器和变量求和。用户可以调用SparkContext.accumulator(v)创建一个初始值为v的累加器,而运行在集群上的Task可以使用“+=”操作,但这些任务却不能读取;只有驱动程序才能获取累加器的值。
使用方法:
val accum = sc.accumulator(0) sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum + = x) accum.value val num=sc.parallelize(1 to 100) |
3.2 RDD
3.2.1 术语定义
Ø 弹性分布式数据集(RDD): Resillient Distributed Dataset,Spark的基本计算单元,可以通过一系列算子进行操作(主要有Transformation和Action操作);
Ø 有向无环图(DAG):Directed Acycle graph,反应RDD之间的依赖关系;
Ø 有向无环图调度器(DAG Scheduler):根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler;
Ø 任务调度器(Task Scheduler):将Taskset提交给worker(集群)运行并回报结果;
Ø 窄依赖(Narrow dependency):子RDD依赖于父RDD中固定的datapartition;
Ø 宽依赖(Wide Dependency):子RDD对父RDD中的所有datapartition都有依赖。
3.2.2 RDD概念
RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭代运算比较常见的机器学习算法, 交互式数据挖掘来说,效率提升非常大。
RDD 最适合那种在数据集上的所有元素都执行相同操作的批处理式应用。在这种情况下, RDD 只需记录血统中每个转换就能还原丢失的数据分区,而无需记录大量的数据操作日志。所以 RDD不适合那些需要异步、细粒度更新状态的应用,比如Web应用的存储系统,或增量式的Web爬虫等。对于这些应用,使用具有事务更新日志和数据检查点的数据库系统更为高效。
1、RDD的特点
Ø 来源:一种是从持久存储获取数据,另一种是从其他RDD生成
Ø 只读:状态不可变,不能修改
Ø 分区:支持元素根据 Key 来分区 ( Partitioning ) ,保存到多个结点上,还原时只会重新计算丢失分区的数据,而不会影响整个系统
Ø 路径:在 RDD 中叫世族或血统 ( lineage ) ,即 RDD 有充足的信息关于它是如何从其他 RDD 产生而来的
Ø 持久化:可以控制存储级别(内存、磁盘等)来进行持久化
Ø 操作:丰富的动作 ( Action ) ,如Count、Reduce、Collect和Save 等
2、RDD基础数据类型
目前有两种类型的基础RDD:并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行计算。Hadoop数据集(Hadoop Datasets):在一个文件的每条记录上运行函数。只要文件系统是HDFS,或者hadoop支持的任意存储系统即可。这两种类型的RDD都可以通过相同的方式进行操作,从而获得子RDD等一系列拓展,形成lineage血统关系图。
a)并行化集合
并行化集合是通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。例如,下面的解释器输出,演示了如何从一个数组创建一个并行集合。
例如:val rdd = sc.parallelize(Array(1to 10)) 根据能启动的executor的数量来进行切分多个slice,每一个slice启动一个Task来进行处理。val rdd = sc.parallelize(Array(1 to 10), 5) 指定了partition的数量
b)Hadoop数据集
Spark可以将任何Hadoop所支持的存储资源转化成RDD,如本地文件(需要网络文件系统,所有的节点都必须能访问到)、HDFS、Cassandra、HBase、Amazon S3等,Spark支持文本文件、SequenceFiles和任何Hadoop InputFormat格式。
(1)使用textFile()方法可以将本地文件或HDFS文件转换成RDD
支持整个文件目录读取,文件可以是文本或者压缩文件(如gzip等,自动执行解压缩并加载数据)。如textFile(”file:///dfs/data”)
支持通配符读取,例如:
val rdd1 = sc.textFile("file:///root/access_log/access_log*.filter"); val rdd2=rdd1.map(_.split("t")).filter(_.length==6) rdd2.count() ...... 14/08/20 14:44:48 INFO HadoopRDD: Input split: file:/root/access_log/access_log.20080611.decode.filter:134217728+20705903 ...... |
textFile()可选第二个参数slice,默认情况下为每一个block分配一个slice。用户也可以通过slice指定更多的分片,但不能使用少于HDFS block的分片数。
(2)使用wholeTextFiles()读取目录里面的小文件,返回(用户名、内容)对
(3)使用sequenceFile[K,V]()方法可以将SequenceFile转换成RDD。SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。
(4)使用SparkContext.hadoopRDD方法可以将其他任何Hadoop输入类型转化成RDD使用方法。一般来说,HadoopRDD中每一个HDFS block都成为一个RDD分区。
此外,通过Transformation可以将HadoopRDD等转换成FilterRDD(依赖一个父RDD产生)和JoinedRDD(依赖所有父RDD)等。
3、例子:控制台日志挖掘
假设网站中的一个WebService出现错误,我们想要从数以TB的HDFS日志文件中找到问题的原因,此时我们就可以用Spark加载日志文件到一组结点组成集群的RAM中,并交互式地进行查询。图16是代码示例:
图16 示例代码
首先行 1从 HDFS 文件中创建出一个RDD,而行 2 则衍生出一个经过某些条件过滤后的RDD。行3将这个RDD errors 缓存到内存中,然而第一个RDD lines 不会驻留在内存中。这样做很有必要,因为errors可能非常小,足以全部装进内存,而原始数据则会非常庞大。经过缓存后,现在就可以反复重用errors数据了。我们这里做了两个操作,第一个是统计errors中包含MySQL字样的总行数,第二个则是取出包含HDFS字样的行的第三列时间,并保存成一个集合。
图17 示例DAG图
这里要注意的是前面曾经提到过的Spark的延迟处理。Spark调度器会将filter和map这两个转换保存到管道,然后一起发送给结点去计算。
3.2.3 转换与操作
对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD)。
Ø 转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
Ø 操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
图18 转换与操作示意图
1、操作
reduce(func) |
通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行 |
collect() |
在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM |
count() |
返回数据集的元素个数 |
take(n) |
返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用) |
first() |
返回数据集的第一个元素(类似于take(1) |
saveAsTextFile(path) |
将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本 |
saveAsSequenceFile(path) |
将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等) |
foreach(func) |
在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互 |
2、转换
map(func) |
返回一个新的分布式数据集,由每个原元素经过func函数转换后组成 |
filter(func) |
返回一个新的数据集,由经过func函数后返回值为true的原元素组成 |
flatMap(func) |
类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素) |
sample(withReplacement, frac, seed) |
根据给定的随机种子seed,随机抽样出数量为frac的数据 |
union(otherDataset) |
返回一个新的数据集,由原数据集和参数联合而成 |
groupByKey([numTasks]) |
在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task |
reduceByKey(func, [numTasks]) |
在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。 |
join(otherDataset, [numTasks]) |
在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集 |
groupWith(otherDataset, [numTasks]) |
在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为CoGroup |
cartesian(otherDataset) |
笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,所有元素交互进行笛卡尔积。 |
3.2.4 依赖类型
在 RDD 中将依赖划分成了两种类型:窄依赖 (Narrow Dependencies) 和宽依赖 (Wide Dependencies) 。窄依赖是指父RDD的每个分区都只被子RDD的一个分区所使用 。相应的,那么宽依赖就是指父RDD的分区被多个子 RDD 的分区所依赖。例如,Map就是一种窄依赖,而Join则会导致宽依赖(除非父RDD是hash-partitioned,见图19)。
图19 宽窄依赖示意图
Ø 窄依赖(Narrow Dependencies )
² 父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区(O(1),与数据规模无关)
² 输入输出一对一的算子,且结果RDD 的分区结构不变,主要是map 、flatMap
² 输入输出一对一,但结果RDD 的分区结构发生了变化,如union 、coalesce
² 从输入中选择部分元素的算子,如filter 、distinct 、subtract 、sample
Ø 宽依赖(Wide Dependencies )
² 父RDD的每个分区可能被多个子RDD所使用,子RDD 分区通常对应所有的父RDD 分区(O(n),与数据规模有关)
² 对单个RDD 基于Key 进行重组和reduce,如groupByKey 、reduceByKey ;
² 对两个RDD 基于Key 进行join 和重组,如join
3.2.5 RDD缓存
Spark可以使用persist和cache方法将任意RDD缓存到内存、磁盘文件系统中。缓存是容错的,如果一个RDD分片丢失,可以通过构建它的transformation自动重构。被缓存的 RDD被使用的时,存取速度会被大大加速。一般的executor内存60%做cache,剩下的40%做task。
Spark中,RDD类可以使用cache()和persist()方法来缓存。cache()是persist()的特例,将该RDD缓存到内存中。而persist可以指定一个StorageLevel。StorageLevel的列表可以在StorageLevel 伴生单例对象中找到:
object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) // Tachyon } |
// 其中,StorageLevel 类的构造器参数如下:
class StorageLevelprivate( private var useDisk_ :Boolean, private var useMemory_ :Boolean, private var useOf Spark的不同StorageLevel,目的满足内存使用和CPU效率权衡上的不同需求。我们建议通过以下的步骤来进行选择:
Ø 如果你的RDDs可以很好的与默认的存储级别(MEMORY_ONLY)契合,就不需要做任何修改了。这已经是CPU使用效率最高的选项,它使得RDDs的操作尽可能的快;
Ø 如果不行,试着使用MEMORY_ONLY_SER并且选择一个快速序列化的库使得对象在有比较高的空间使用率的情况下,依然可以较快被访问;
Ø 尽可能不要存储到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度,和与从硬盘中读取基本差不多快;
Ø 如果你想有快速故障恢复能力,使用复制存储级别(例如:用Spark来响应web应用的请求)。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让你在RDD上持续的运行任务,而不需要等待丢失的分区被重新计算;
Ø 如果你想要定义你自己的存储级别(比如复制因子为3而不是2),可以使用StorageLevel 单例对象的apply()方法;
Ø 在不使用cached RDD的时候,及时使用unpersist方法来释放它。
3.3 运行流程
Spark运行基本流程如图20所示:
图20 Spark基本运行流程图
1、 构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或者YARN)注册并申请运行Executor资源
2、 资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上
3、 SparkContext构建成DAG,将DAG图分解成Stage,并把Task发送给Task Scheduler。Executor向SparkContext申请Task,Task Scheduler将发送给Executor运行同时SparkContext将应用程序代码发放给Executor
4、 Task在Executor上运行,运行完毕释放所有资源
Spark运行架构特点:
Ø 每个Application获取专属的executor进程,该进程在Application期间一直驻留,并以多线程方式运行tasks。这种Application隔离机制有其优势的,无论是从调度角度看(每个Driver调度它自己的任务),还是从运行角度看(来自不同Application的Task运行在不同的JVM中)。当然,这也意味着SparkApplication不能跨应用程序共享数据,除非将数据写入到外部存储系统
Ø Spark与资源管理器无关,只要能够获取executor进程,并能保持相互通信就可以了
Ø 提交SparkContext的Client应该靠近Worker节点(运行Executor的节点),最好是在同一个Rack里,因为SparkApplication运行过程中SparkContext和Executor之间有大量的信息交换;如果想在远程集群中运行,最好使用RPC将SparkContext提交给集群,不要远离Worker运行SparkContext
Ø Task采用了数据本地性和推测执行的优化机制
3.3.1 DAGScheduler
DAGScheduler把一个Spark作业转换成Stage的DAG(Directed Acyclic Graph有向无环图),根据RDD和Stage之间的关系找出开销最小的调度方法,然后把Stage以TaskSet的形式提交给TaskScheduler,下图展示了DAGScheduler的作用:
图21 DAGScheduler作用
3.3.2 TaskScheduler
DAGScheduler决定了运行Task的理想位置,并把这些信息传递给下层的TaskScheduler。此外,DAGScheduler还处理由于Shuffle数据丢失导致的失败,这有可能需要重新提交运行之前的Stage(非Shuffle数据丢失导致的Task失败由TaskScheduler处理)。
TaskScheduler维护所有TaskSet,当Executor向Driver发送心跳时,TaskScheduler会根据其资源剩余情况分配相应的Task。另外TaskScheduler还维护着所有Task的运行状态,重试失败的Task。图22展示了TaskScheduler的作用:
图22 TaskScheduler作用
在不同的运行模式下任务调度器具体为:
Ø Spark on Standalone模式为TaskScheduler
Ø YARN-Client模式为YarnClientClusterScheduler
Ø YARN-Cluster模式为YarnClusterScheduler
3.3.3 RDD运行原理
RDD在Spark架构中的运行步骤主要分为三步:
1、 创建 RDD 对象
2、 DAGScheduler模块介入运算,计算RDD之间的依赖关系。RDD之间的依赖关系就形成了DAG
3、 每一个JOB被分为多个Stage,划分Stage的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个Stage,避免多个Stage之间的消息传递开销
三个步骤的详细情况如图23所示:
图23 RDD运行步骤
以图24为例,一个按A-Z首字母分类,查找相同首字母下不同姓名总个数来看一下RDD的运行步骤。
图24 RDD示例
步骤1:创建RDD上面的例子去除最后一个collect是个动作,不会创建RDD之外,前面四个转换操作都会创建出新的RDD。因此第一步就是创建好所有RDD(内部的五项信息)。
步骤2:创建执行计划Spark会尽可能的管道化,并基于是否要重新组织数据来划分阶段(Stage),例如本例中的groupBy()转换就会将整个执行计划分成两阶段执行。最终会产生一个DAG(directed acyclic graph,有向无环图)作为逻辑执行计划,如图25所示。
图25 RDD示例
步骤 3 :调度任务,将各阶段划分成不同的任务(task),每个任务都是数据和计算的合体。在进行下一阶段前,当前阶段的所有任务都要执行完成。因为下一阶段的第一个转换一定是重新组织数据的,所以必须等当前阶段所有结果数据都计算出来了才能继续。
假设本例中的hdfs://names下有四个文件块,那么HadoopRDD中partitions就会有四个分区对应这四个块数据,同时preferedLocations会指明这四个块的最佳位置。现在,就可以创建出四个任务,并调度到合适的集群结点上。
图26 RDD示例
3.4 集群运行框架
Spark注重建立良好的生态系统,它不仅支持多种外部文件存储系统,提供了多种多样的集群运行模式。部署在单台机器上时,既可用本地(local)模式运行,也可以使用伪分布式模式来运行;当以分布式集群部署的时候,可以根据自己集群的实际情况选择Standalone模式(Spark自带的模式)、YARN-Client模式或者YARN-Cluster模式。Spark的各种运行模式虽然在启动方式、运行位置、调度策略上各有不同,但他们的目的基本都是一致的,就是在合适的位置安全可靠的根据用户的配置和Job的需要运行和管理Task。
3.4.1 Spark on Standalone
Standalone模式是Spark实现的资源调度框架,其主要的节点有Client节点、Master节点和Worker节点。其中Driver既可以运行在Master节点上,也可以运行在本地Client端。当使用Spark-shell交互式工具提交Spark的Job时,Driver在Master节点上运行;当使用Spark-summit工具提交Job或者在Eclipse、IDEA等平台上使用“new SparkConf.setManager(“spark://master:7077”)”方式运行Spark任务时,Driver是运行在本地Client端上的。
具体运行过程如下:
1.SparkContext连接到Master,向Master注册并申请资源(CPU Core 和Memory);
2.Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上获取资源,然后启动StandaloneExecutorBackend;
3.StandaloneExecutorBackend向SparkContext注册;
4.SparkContext将Applicaiton代码发送给StandaloneExecutorBackend;并且SparkContext解析Applicaiton代码,构建DAG图,并提交给DAG Scheduler分解成Stage(当碰到Action操作时,就会催生Job;每个Job中含有1个或多个Stage,Stage一般在获取外部数据和shuffle之前产生),然后以Stage(或者称为TaskSet)提交给Task Scheduler,Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行;
5.StandaloneExecutorBackend会建立Executor线程池,开始执行Task,并向SparkContext报告,直至Task完成。
6.所有Task完成后,SparkContext向Master注销,释放资源。
运行示意图如图27所示。
图27 Spark on Standalone
3.4.2 Spark on YARN
YARN是一种统一资源管理机制,在其上面可以运行多套计算框架。目前的大数据技术世界,大多数公司除了使用Spark来进行数据计算之外,由于历史原因或者单方面业务处理的性能考虑而使用着其他的计算框架,比如MapReduce、Storm等计算框架。Spark基于此种情况开发了Spark on YARN的运行模式,由于借助了YARN良好的弹性资源管理机制,不仅部署Application更加方便,而且用户在YARN集群中运行的服务和Application的资源也完全隔离,更具实践应用价值的是YARN可以通过队列的方式,管理同时运行在集群的多个服务。
Sparkon YARN模式根据Driver在集群中的位置分为两种模式:一种是YARN-Client模式,另一种是YARN-Cluster(或称为YARN-Standalone模式)。
1、 YARN框架流程
任何框架与YARN的结合,都必须遵循YARN的开发模式。在分析Spark on YARN的实现细节之前,有必要先分析一下YARN框架的基本原理。
YARN框架的基本运行流程图如图28所示。
图28 YARN运行流程图
其中,ResourceManager负责将集群的资源分配给各个应用使用,而资源分配和调度的基本单位是Container,其中封装了机器资源,如内存、CPU、磁盘和网络等,每个任务会被分配一个Container,该任务只能在该Container中执行,并使用该Container封装的资源。NodeManager是一个个的计算节点,主要负责启动Application所需的Container,监控资源(内存、CPU、磁盘和网络等)的使用情况并将之汇报给ResourceManager。ResourceManager与NodeManager共同组成整个数据计算框架,ApplicationMaster与具体的Application相关,主要负责与ResourceManager协商以获取合适的Container,并跟踪这些Container的状态和监控其进度。
2、 YARN-Client
YARN-Client模式中,Driver在客户端本地运行,这种模式可以使得Spark Application
和客户端进行交互,因为Driver在客户端,所以可以通过webUI访问Driver的状态,默认是http://RMIPAdd:4040访问,而YARN通过http://RMIPAdd:8080访问。
YARN-Client的工作流程如图29所示:
图29 Spark-Client工作流程
(1)Spark YarnClient向YARN的ResourceManager申请启动Application Master。同时在SparkContext初始化中将创建DAGScheduler和TASKScheduler等,由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend;
(2)ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派;
(3)Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);
(4)一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;
(5)Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
(6)应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己。
3、 YARN-Cluster
在YARN-Cluster模式中,当向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:第一阶段是把Spark的Driver作为一个ApplicationMaster在YARN集群中先启动;第二个阶段是由ApplicationMaster创建应用程序,然后为它向ResourceManager申请资源,并启动Executor来执行Task,同时监控它的整个运行过程,直到运行完成。
YARN-Cluster的工作流程分为以下几个步骤,如图30所示。
图30 YARN-Cluster工作流程
(1)Spark YarnClient向YARN中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等;
(2)ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;
(3)ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;
(4)一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度,其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等;
(5)ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
(6)应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。
4、 YARN-Client与YARN-Cluster的区别
理解YARN-Client和YARN-Cluster深层次的区别之前先搞清楚一个概念:
ApplicationMaster。在YARN中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。它负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。从深层次的含义讲YARN-Cluster和YARN-Client模式的区别其实就是ApplicationMaster进程的区别。
(1) YARN-Cluster模式下,Driver运行在AM(ApplicationMaster)中,它负责向YARN申请资源,并监督作业的运行情况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,因而YARN-Cluster模式不适合运行交互类型的作业;如图31所示。
图31 YARN-Cluster模式
(2)YARN-Client模式下,ApplicationMaster仅仅向YARN请求Executor,Client会和请求的Container通信来调度他们工作,也就是Client不能离开。具体可见图32。
图32 YARN-Client模式
4 附录
参考资料:
Spark入门实战系列--1.Spark及其生态圈简介 - shishanyuan - 博客园
Spark入门实战系列--3.Spark编程模型(上)--编程模型及SparkShell实战 - shishanyuan - 博客园
Spark入门实战系列--4.Spark运行架构 - shishanyuan - 博客园