Job
Spark的整个作业体系中,处于顶层的是Job, Job和Spark中的Action是一一对应的,每一个Action都会触发一个Job的执行,这个Job包含的处理逻辑是Action以及Action之前的所有Transformation, 所有这些逻辑会被Spark转换成一张关于RDD的DAG(有向无环图),这个DAG也就是实际意义上的Job的执行计划。本文原文出处: 本文原文链接: http://blog.csdn.net/bluishglc/article/details/80653801 转载请注明出处。
Stage
Job的下一个层级是Stage,一个Stage是由一群task组成的,这些task执行的是完全一样的代码,分散到多个executor上执行。Stage对标的是shuffle, 或者叫宽依赖的transformation, 也就是每当在Job执行中遇到一个需要shuffle的transformation时,就将这个transformation和它之前的所有动作放到一个Stage里(当然,如果只action,自然也会触发一个stage),这样的安排是一种很自然的处理方式,究其本质,在一系列的分布式计算中,当遇到shuffle之前,所有的操作都可以在一个单一分区里独立完成,并不需要依赖到其他的分区(例如map,filter这类操作),这些操作可以放在一个流水线(pipline)中执行,这样有助于性能优化,而一但作业中遇到一个需要基于全体数据集(spark称之为all-to-all 操作)进行计算的操作(例如groupByKey, reduceByKey),就需要进行shuffle, shuffle会对全体数据重新洗牌并分区,所以后续的操作不可能和前面的操作放在同一个流水线上,所以每个stage是以一个shuffle来终结的。这里需要对前面提到的transformation的窄依赖和宽依赖进行一个说明:
- 窄依赖指的这个计算在每一个partition上都只依赖于partition自有的数据就可以完成,再直白地解释就是如果某种操作在每一个局部(each partition)做完就相当于全集做完,这就是窄依赖,例如map操作。
- 宽依赖指的这个计算只有在数据全集(all partitions)上执行才能得到结果,例如groupByKey等。
Task
Task是Spark执行作业的最小单位,一个Task只对应一个Partition, 前面提到过,对于同一个RDD下的某个transformation,它有多少个partition就会对应生成多少个task, 这些task执行的是完全一样的代码。所以控制一个Stage会产生多少个Task的唯一因素就是Spark配置了多个个partition,这个配置参数是spark.sql.shuffle.partitions
, 它的默认值是200,也就是默认一个stage总是产生200个task,对于一些只有一个节点的小集群来说,这个数值是比较大的,在性能调优时往往会调小。
一个“立体”的视图
需要注意的一点是Job->Stage和Stage->Task的切分的维度是不太一样的,如果我们把一个作业涉及到的一系列的操作(transformation和action)看成是横向排列的,那么Stage就是对作业的“垂直”切分,分解的依据就是遇到需要shuffle的transformaion或action就切一刀。而Stage->Task的切分则是一种“水平”切分了,每一个task要执行的操作就是Stage圈定的所有的操作,只是一个task基于一个partition的数据去执行这些操作,如果Spark配置的是200个分区,则一个Stage就会被“水平”切分成200个Task去并行的执行!
所以下图展示了本文自定义的“垂直”与“水平”切分的定义,这样,我们可以清晰地认识到Spark是如何将一系列连续的数据操作在一个分布式环境上“拆解”成了一个操作的“矩阵”,垂直方向上是可以在单一partition上连续执行的操作,水平方向是可以并行处理的多个分区。
注:上图由两张图拼接面来,一张是来自:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-taskscheduler-tasks.html, 另一张来自:http://litaotao.github.io/deep-into-spark-exection-model