spark 笔记 8: Stage

时间:2021-04-03 23:17:32

spark 笔记 8:  Stage
Stage 是一组独立的任务,他们在一个job中执行相同的功能(function),功能的划分是以shuffle为边界的。DAG调度器以拓扑顺序执行同一个Stage中的task。
/**
* A stage is a set of independent tasks all computing the same function that need to run as part
* of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
* by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
* DAGScheduler runs these stages in topological order.
*
* Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
* another stage, or a result stage, in which case its tasks directly compute the action that
* initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
* that each output partition is on.
*
* Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
* faster on failure.
*
* The callSite provides a location in user code which relates to the stage. For a shuffle map
* stage, the callSite gives the user code that created the RDD being shuffled. For a result
* stage, the callSite gives the user code that executes the associated action (e.g. count()).
*
* A single stage can consist of multiple attempts. In that case, the latestInfo field will
* be updated for each attempt.
*
*/
private[spark] class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
val shuffleDep: Option[ShuffleDependency[_, _, _]], // Output shuffle if stage is a map stage
val parents: List[Stage],
val jobId: Int,
val callSite: CallSite)
extends Logging {
重要属性:
val isShuffleMap = shuffleDep.isDefined
val numPartitions = rdd.partitions.size
val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
var numAvailableOutputs = 0

/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]

/** For stages that are the final (consists of only ResultTasks), link to the ActiveJob. */
var resultOfJob: Option[ActiveJob] = None
var pendingTasks = new HashSet[Task[_]]
def addOutputLoc(partition: Int, status: MapStatus) {
/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
* The map output sizes are compressed using MapOutputTracker.compressSize.
*/
private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])





















spark 笔记 8: Stage的更多相关文章

  1. spark笔记 环境配置

    spark笔记 spark简介 saprk 有六个核心组件: SparkCore.SparkSQL.SparkStreaming.StructedStreaming.MLlib,Graphx Spar ...

  2. Spark 资源调度包 stage 类解析

    spark 资源调度包 Stage(阶段) 类解析 Stage 概念 Spark 任务会根据 RDD 之间的依赖关系, 形成一个DAG有向无环图, DAG会被提交给DAGScheduler, DAGS ...

  3. spark 笔记 15: ShuffleManager,shuffle map两端的stage/task的桥梁

    无论是Hadoop还是spark,shuffle操作都是决定其性能的重要因素.在不能减少shuffle的情况下,使用一个好的shuffle管理器也是优化性能的重要手段. ShuffleManager的 ...

  4. spark 笔记 13: 再看DAGScheduler,stage状态更新流程

    当某个task完成后,某个shuffle Stage X可能已完成,那么就可能会一些仅依赖Stage X的Stage现在可以执行了,所以要有响应task完成的状态更新流程. ============= ...

  5. 大数据学习——spark笔记

    变量的定义 val a: Int = 1 var b = 2 方法和函数 区别:函数可以作为参数传递给方法 方法: def test(arg: Int): Int=>Int ={ 方法体 } v ...

  6. spark 笔记 16: BlockManager

    先看一下原理性的文章:http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/ ,http://jerrys ...

  7. spark 笔记 9: Task/TaskContext

    DAGScheduler最终创建了task set,并提交给了taskScheduler.那先得看看task是怎么定义和执行的. Task是execution执行的一个单元. Task: execut ...

  8. spark 笔记 7: DAGScheduler

    在前面的sparkContex和RDD都可以看到,真正的计算工作都是同过调用DAGScheduler的runjob方法来实现的.这是一个很重要的类.在看这个类实现之前,需要对actor模式有一点了解: ...

  9. spark 笔记 2: Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing

    http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf  ucb关于spark的论文,对spark中核心组件RDD最原始.本质的理解, ...

随机推荐

  1. ASP.NET MVC之路由特性以及母版页呈现方式(十二)

    前言 这一节我们开始讲讲基础的东西也就是如题目所言,个人觉得当学习或者利用MVC时,必须得知道最新迭代版本新增了什么,至少得知道MVC 3.MVC 4或者MVC 5有什么区别,而不至于当利用到低版本时 ...

  2. MySql无限分类数据结构--预排序遍历树算法

    MySql无限分类数据结构--预排序遍历树算法 无限分类是我们开发中非常常见的应用,像论坛的的版块,CMS的类别,应用的地方特别多. 我们最常见最简单的方法就是在MySql里ID ,parentID, ...

  3. LINUX系统编程 由REDIS的持久化机制联想到的子进程退出的相关问题

    19:22:01 2014-08-27 引言: 以前对wait waitpid 以及exit这几个函数只是大致上了解,但是看REDIS的AOF和RDB 2种持久化时 均要处理子进程运行完成退出和父进程 ...

  4. sql server 修改字段大小

    alter table AAA ALTER COLUMN BBBvarchar(100);  这种语法只适合把字段往大了设置,  如果更新的字段大小比实际的要小时,并且数据库已存在比要更新的数据大时, ...

  5. Linux下安装oracle jdk

    从官网下载对应的 .tar.gz压缩文件. 在linux某个目录下解压 到根目录下的etc文件夹下编辑profile文件,在文件的末尾加上 JAVA_HOME=/usr/local/jdk1.8.0_ ...

  6. Sql 四大排名函数(ROW_NUMBER、RANK、DENSE_RANK、NTILE)简介

    排名函数是Sql Server2005新增的功能,下面简单介绍一下他们各自的用法和区别.我们新建一张Order表并添加一些初始数据方便我们查看效果. CREATE TABLE [dbo].[Order ...

  7. 使用Eclipse的代码追踪功能

    在使用Java编写复杂一些的程序时,你会不会常常对一层层的继承关系和一次次方法的调用感到迷惘呢?幸亏我们有了Eclipse这么好的IDE可以帮我们理清头绪--这就要使用Eclipse强大的代码追踪功能 ...

  8. json&pickle&shelve模块

    之前我们学习过用eval内置方法可以将一个字符串转成python对象,不过,eval方法是有局限性的,对于普通的数据类型,json.loads和eval都能用,但遇到特殊类型的时候,eval就不管用了 ...

  9. 【SDOI2009】Bill的挑战

    Description Sheng bill不仅有惊人的心算能力,还可以轻松地完成各种统计.在昨天的比赛中,你凭借优秀的程序与他打成了平局,这导致Sheng bill极度的不满.于是他再次挑战你.这次 ...

  10. JSP指令(page include taglib)

    JSP指令指示JSP转换器如何翻译JSP页面到Servlet:JSP指令用来设置整个JSP页面相关的属性,如网页编码方式.脚本语言等 JSP指令的格式: <%@ directive attrib ...