spark2.2.0源码学习过程记录:Day10

时间:2021-08-12 20:50:51
1、读《apache spark 源码剖析》第七章第2节 接着昨天的来
2、源码学习 DataSet中的ofRows方法调用了sparkSession.sessionState.executePlan,返回一个QueryExecution对象
类SessionState 类注释A class that holds all session-specific state in a given [[SparkSession]].可以看出这个类是保存sparkSession的状态的 executePlan方法调用了createQueryExecution方法,再跟这个方法,发现是创建的时候传入的
看sparkSession创建SessionState的代码,是昨天看过的,最后发现是在BaseSessionStateBuilder中定义的,定义为: /** * Create a query execution object. */ protected def createQueryExecution: LogicalPlan => QueryExecution = { plan => new QueryExecution(session, plan) } 所有后续的生成analyzed logical plan、optimized logical plan、physical plan的步骤都是在这个类中调用相关方法处理的
Analyze: 调用sparkSession.sessionState.analyzer.execute 调用到RuleExecutor的execute,其中batches比较重要,是每个RuleExecutor不一样的地方,比如analyzer有ResolveRelations、ResolveReferences等 我也看书中的例子ResolveRelations的apply方法如果发现是UnresolvedRelation则调用resolveRelation(u),在方法内调用了 val defaultDatabase = AnalysisContext.get.defaultDatabase val relation = lookupTableFromCatalog(u, defaultDatabase) 可以看出是在找数据实际存放的地方
Optimize: 调用sparkSession.sessionState.optimizer.execute 其他基本一样,只是batches和analyze不一样
sparkPlan: 调用planner.plan(ReturnAnswer(optimizedPlan)).next() 这个调用到了QueryPlanner的plan方法,类似于之前analyze和optimize,这里有类似batches的strategies,定义在具体的实现类中,在这里具体的实现类是SparkPlanner,比如他包含了JoinSelection、JoinSelection等
最后两步 // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

第一轮的spark源码学习到此为止了,有很多地方没深入看,有些地方没看懂,以后再补,通过这次找到了一个学习源码的好方法,非常开心