spark的任务执行流程解析

时间:2022-12-19 20:42:37

一、从架构上看Spark的Job工作(Master\Worker)

[睡着的水-hzjs-2016.8.18]

1.Master节点上是Master进程,主要是管理资源的,资源主要是内存和CPU。Master能够接收客户端发送的程序并且为程序进行注册。worker节点有worker进程,负责当前节点的内存和cpu的使用,spark是主从结构式架构。

2.Spark运行作业的方式有很多,最简单的是就是通过spark-shell ,每个程序的程序的ID是向master 注册的时候由master分配的。worker节点在执行程序工作的时候的core数量、内存的大小是在安装Spark集群的时候配置文件中我们自己配置的。当启动spark-shell 的时候并没有运行任何job计算任务,那Master有没有分配资源呢?是有的!

3.master分配资源的方式是粗粒度的。spark-shell 中默认的是没有任何的Job,有资源分配就是因为分配资源的方式粗粒度的,默认情况下,虽然没有job计算,只要提交了程序并注册了,就会分配资源。

4.分配资源默认的分配方式在每个workers上,为当前程序分配一个Executorbackend 进行,且默认情况下会最大化的使用cores和memory。

5.Executor一次性最多能运行多少个并发的task,取决于当前Executor 能够使用的work的core的数量

6.有多个Executor ,就能更多的获取内存跟core的数量,但是默认情况下,一个节点只有一个Executor.

7.出现OOM的解决方式之一就是加大分片的数量。

8.一个分片数据进哪一个Executor ,主要取决于数据的本地性。

9.Executor 的并行度是可以继承的,后面的任务并行数量会继承前面的task数量,如果不进行类似reduceByKey类似的操作。

10.reduceByKey 运行的时候会只有一个任务,并行分片唯一,一般是运行在数据量比较大的节点上,数据本地性线程并不关心运行 什么代码,线程是代表这资源,由于它不关心代码,所以task与Thread是解耦合的,所以Thread是可以复用的,


总结:当spark集群启动的时候,首先启动master进程,负责整个集群资源的管理和分配,并且接收作业的提交,为作业注册、分配资源及每个工作节点默认情况都会启动一个worker Process 来管理当前节点的cpu等计算资源(实际上是还是通过Master来管理的)。worker Process 它会向master汇报worker当前能够工作。当用户向Master提交作业的时候,master为会为程序分配ID,并分配资源,默认情况下会为当前的应用程序在每个worker Process 下面分配一个CoarseGrainedExecutorBackend进程,该进程默认情况下会最大化的使用当前节点上的内存和CPU。

流程图如下:

spark的任务执行流程解析


二、从依赖上看spark的job运行原理

两种依赖的示意图如下,窄依赖、宽依赖:

spark的任务执行流程解析

宽依赖与窄依赖的定义:

spark的任务执行流程解析

示例如下图:

spark的任务执行流程解析

如果是这样的复杂的依赖关系会产生很多问题,严重的影响程序的性能:

1、Task太大。遇到Shuffle级别的依赖关系(宽依赖)必须要计算依赖的RDD的所有的Partitions,并且是发生在一个Task中。

2、重复计算。如上图最终的结果有三个Partitions,所以因为Shuffle的缘由,计算了三次所有的依赖RDD.

3、假设考虑从后往前的依赖关系,我们设计算法的角度看那些多次用到就需要cache(),这会造成了存储的浪费。


-------虽然这个依赖有很多的内存浪费,计算重复等,,但是我们依然可以看出血统的影子pipeline(做函数展开),无法很好的实现是因为上述的假设的核心问题都是在遇到shuffle依赖的时候无法很好的进行pipeline.我们只能退而求其次,在遇到shuffle的时候我们就需要断开,这样一个个的Stage的划分就清楚了。窄依赖加入,宽依赖断开。

-------每个Stage里面的Task的数量是由最后一个Stage的Partition的数量决定的

-------再次思考pipeline,即使采用pipeline的方式,函数f对依赖的RDD中的数据集合操作也会有两种方式:

1,f(记录),f 作用于集合的每一条记录,每次只作用于一条数据(spark采用);

2,f(记录),f一次性作用于集合的全部数据;

-------Spark采用第一种方式的原因:、

1、无需等待,可以最大化的使用集群的计算资源。

2、减少OOM的发生

3、最大化的有利于并发

4、可以精确的控制每一个partiton本身极其内部的计算

5、基于lineage的算子流动式函数编程,节省了中间结果的产生,并且可以最快的恢复;

三、从物理执行的角度看Spark的job执行

------Spark Application 里面可以产生一个或者多个Job,例如spark-shell 默认启动的时候内部就没有Job,只是作为资源分配的程序,可以在spark-shell 里面写代码产生他若干个job,普通程序一般而言可以有不同的Action,每一个Action一般也会触发一个job

------Spark是MapReduce思想的一种更加精致和高效的实现,MapReduce有很多集体不同的实现,例如hadoop的MapReduce基本的计算流程如下:

1、首先是以JVM为对象的并发执行的Mappper,其中Map的执行会产生数据,输出数据经过Partitioner指定的规则,放到当地的文件系统中,然后经过Shuffle、sort、Affregate 变成reduce的输入,执行reduce产生最终的执行结果。但是在执行迭代的时候,由于每次都要将reduce的结果存入HDFS,下次计算还要取出,,,这样就造成了很多的局限性。

2、Spark 算法构造和物理执行时的核心思想之一就是:最大化pipeline(数据复用效果好)!基于Pipeline的思想,数据被使用的时候才开始计算,从数据流动的视觉来说,是数据流动到计算的位置!从逻辑的角度上来看是算子在数据上流带动。从算法构建的角度而言,是算子作用于数据,所以是算子在数据上流动。从物理执行的角度而言,是数据流动到计算的位置。

--------每个Stage中除了最后一个RDD算子是真实的以外,前面的算子都是假的。在中间它会进行算子合并。。。

--------由于计算的Lazy特性,导致计算从后往前回溯,形成Computing Chain ,导致的结果就是需要首先计算出具体一个Stage内部左侧的RDD中本次计算依赖的Partition(只是逻辑上思考的结果,在所有的发送给Executor之前,所有的算子已经合并成了一个,现实中就是一个函数).计算发生的位置在最后的RDD。

------从后往前回溯形成计算链条,实际执行肯定是从前往后执行的。

-----窄依赖的物理执行

一个Stage内部的RDD都是窄依赖,窄依赖计算本身是逻辑上看从Stage内部最左侧的RDD开始立即计算的,根据Computing Chain(从后往前回溯构建生成),数据(Record) 从一个计算步骤流动下一个结算步骤,以此类推,知道算到Stage内部最后一个RDD来产生计算结果。实际物理计算是让数据从前往后再算子上流动,知道不能流动时,就开始计算下一个Record。这就导致了一个比较好的方式:

后面的RDD对前面的RDD的依赖虽然是Partition级别的数据集合的依赖,但是并不需要父RDD把Partition中所有的Records计算完毕才整体往后流动数据进行计算,这就极大的提高了计算速率。

-----宽依赖的物理执行

必须等到依赖的父Stage中最后一个RDD全部数据彻底计算完毕,才能够经过shuffle来计算当前的Stage.