Spark SQL源代码分析之核心流程

时间:2022-06-13 13:26:35

/** Spark SQL源代码分析系列文章*/

自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,并且发展速度异常迅猛,究其原因,个人觉得有下面2点:

1、整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里。这样能够应用于多种任务,流处理,批处理,包含机器学习里都能够引入Sql。

    2、效率:由于Shark受到hive的编程模型限制,无法再继续优化来适应Spark模型里。

前一段时间測试过Shark,而且对Spark SQL也进行了一些測试,可是还是忍不住对Spark SQL一探到底,就从源码的角度来看一下Spark
SQL的核心运行流程吧。

一、引子

先来看一段简单的Spark SQL程序:

1. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
2. import sqlContext._
3.case class Person(name: String, age: Int)
4.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
5.people.registerAsTable("people")
6.val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
7.teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

程序前两句1和2生成SQLContext,导入sqlContext以下的all,也就是执行SparkSQL的上下文环境。

程序3,4两句是载入数据源注冊table

第6句是真正的入口,是sql函数,传入一句sql,先会返回一个SchemaRDD。这一步是lazy的,直到第七句的collect这个action运行时,sql才会运行。

二、SQLCOntext

SQLContext是运行SQL的上下文对象,首先来看一下它Hold的有哪些成员:

Catalog  

 一个存储<tableName,logicalPlan>的map结构,查找关系的文件夹,注冊表,注销表,查询表和逻辑计划关系的类。

Spark SQL源代码分析之核心流程

SqlParser 

 Parse 传入的sql来对语法分词,构建语法树,返回一个logical plan

Spark SQL源代码分析之核心流程

Analyzer 

  logical plan的语法分析器

Spark SQL源代码分析之核心流程

Optimizer 

 logical Plan的优化器

Spark SQL源代码分析之核心流程

LogicalPlan 

逻辑计划,由catalyst的TreeNode组成,能够看到有3种语法树

Spark SQL源代码分析之核心流程

SparkPlanner 

包括不同策略的优化策略来优化物理运行计划

Spark SQL源代码分析之核心流程

QueryExecution 

sql运行的环境上下文

Spark SQL源代码分析之核心流程

就是这些对象组成了Spark SQL的执行时,看起来非常酷,有静态的metadata存储,有分析器、优化器、逻辑计划、物理计划、执行执行时。

那这些对象是怎么相互协作来运行sql语句的呢?

三、Spark SQL运行流程

话不多说,先上图,这个图我用一个在线作图工具process on话的,画的不好,图能达意即可:

Spark SQL源代码分析之核心流程



核心组件都是绿色的方框,每一步流程的结果都是蓝色的框框,调用的方法是橙色的框框。



先概括一下,大致的运行流程是:

Parse SQL -> Analyze Logical Plan -> Optimize Logical Plan -> Generate Physical Plan -> Prepareed Spark Plan -> Execute SQL -> Generate RDD



更详细的运行流程:

     sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan  -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 採用不同Strategies生成spark
plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函数调用) 运行sql生成RDD

3.1、Parse SQL

回到開始的程序,我们调用sql函数,事实上是SQLContext里的sql函数它的实现是new一个SchemaRDD,在生成的时候就调用parseSql方法了。

	  /**
* Executes a SQL query using Spark, returning the result as a SchemaRDD.
*
* @group userf
*/
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))

结果是会生成一个逻辑计划

   @transient
protected[sql] val parser = new catalyst.SqlParser protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)

3.2、Analyze to Execution

当我们调用SchemaRDD里面的collect方法时,则会初始化QueryExecution,開始启动运行。

 override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()

我们能够非常清晰的看到运行步骤:

protected abstract class QueryExecution {
def logical: LogicalPlan lazy val analyzed = analyzer(logical) //首先分析器会分析逻辑计划
lazy val optimizedPlan = optimizer(analyzed) //随后优化器去优化分析后的逻辑计划
// TODO: Don't just pick the first one...
lazy val sparkPlan = planner(optimizedPlan).next() //依据策略生成plan物理计划
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //最后生成已经准备好的Spark Plan /** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[Row] = executedPlan.execute() //最后调用toRDD方法运行任务将结果转换为RDD protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString } def simpleString: String = stringOrError(executedPlan) override def toString: String =
s"""== Logical Plan ==
|${stringOrError(analyzed)}
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
""".stripMargin.trim
}

至此整个流程结束。

四、总结:

通过分析SQLContext我们知道了Spark SQL都包括了哪些组件,SqlParser,Parser,Analyzer,Optimizer,LogicalPlan,SparkPlanner(包括Physical Plan),QueryExecution.

  通过调试代码,知道了Spark SQL的运行流程:

sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan  -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to
plan)生成physical plan -> 採用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函数调用) 运行sql生成RDD


  

  随后还会对里面的每一个组件对象进行研究,看看catalyst到底做了哪些优化。

  

  ——EOF——

原创文章:转载请注明出自:http://blog.csdn.net/oopsoom/article/details/37658021