1、读《apache spark 源码剖析》第七章第2节
接着昨天的来
2、源码学习
DataSet中的ofRows方法调用了sparkSession.sessionState.executePlan,返回一个QueryExecution对象
类SessionState
类注释A class that holds all session-specific state in a given [[SparkSession]].可以看出这个类是保存sparkSession的状态的
executePlan方法调用了createQueryExecution方法,再跟这个方法,发现是创建的时候传入的
看sparkSession创建SessionState的代码,是昨天看过的,最后发现是在BaseSessionStateBuilder中定义的,定义为:
/**
* Create a query execution object.
*/
protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
new QueryExecution(session, plan)
}
所有后续的生成analyzed logical plan、optimized logical plan、physical plan的步骤都是在这个类中调用相关方法处理的
Analyze:
调用sparkSession.sessionState.analyzer.execute
调用到RuleExecutor的execute,其中batches比较重要,是每个RuleExecutor不一样的地方,比如analyzer有ResolveRelations、ResolveReferences等
我也看书中的例子ResolveRelations的apply方法如果发现是UnresolvedRelation则调用resolveRelation(u),在方法内调用了
val defaultDatabase = AnalysisContext.get.defaultDatabase
val relation = lookupTableFromCatalog(u, defaultDatabase)
可以看出是在找数据实际存放的地方
Optimize:
调用sparkSession.sessionState.optimizer.execute
其他基本一样,只是batches和analyze不一样
sparkPlan:
调用planner.plan(ReturnAnswer(optimizedPlan)).next()
这个调用到了QueryPlanner的plan方法,类似于之前analyze和optimize,这里有类似batches的strategies,定义在具体的实现类中,在这里具体的实现类是SparkPlanner,比如他包含了JoinSelection、JoinSelection等
最后两步
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
第一轮的spark源码学习到此为止了,有很多地方没深入看,有些地方没看懂,以后再补,通过这次找到了一个学习源码的好方法,非常开心