在第一章《spark-submit提交作业过程》的时候,我们讲过Spark on yarn的在cluster模式下它的main class是org.apache.spark.deploy.yarn.Client。okay,这个就是我们的头号目标。
提交作业
找到main函数,里面调用了run方法,我们直接看run方法。
val appId = runApp()
monitorApplication(appId)
System.exit(0)
运行App,跟踪App,最后退出。我们先看runApp吧。
monitorApplication就不说了,不停的调用getApplicationReport方法获得最新的Report,然后调用getYarnApplicationState获取当前状态,如果状态为FINISHED、FAILED、KILLED就退出。
说到这里,顺便把跟yarn相关的参数也贴出来一下,大家一看就清楚了。
ApplicationMaster
直接看run方法就可以了,main函数就干了那么一件事...
run方法里面主要干了5项工作:
1、初始化工作
2、启动driver程序
3、注册ApplicationMaster
4、分配Executors
5、等待程序运行结束
我们重点看分配Executor方法。
这里面我们只需要看addResourceRequests和allocateResources方法即可。
先说addResourceRequests方法,代码就不贴了。
Client向ResourceManager提交Container的请求,分三种类型:优先选择机器、同一个rack的机器、任意机器。
优先选择机器是在RDD里面的getPreferredLocations获得的机器位置,如果没有优先选择机器,也就没有同一个rack之说了,可以是任意机器。
下面我们接着看allocateResources方法。
1、把从ResourceManager中获得的Container进行选择,选择顺序是按照前面的介绍的三种类别依次进行,优先选择机器 > 同一个rack的机器 > 任意机器。
2、选择了Container之后,给每一个Container都启动一个ExecutorRunner一对一贴身服务,给它发送运行CoarseGrainedExecutorBackend的命令。
3、ExecutorRunner通过NMClient来向NodeManager发送请求。
总结:
把作业发布到yarn上面去执行这块涉及到的类不多,主要是涉及到Client、ApplicationMaster、YarnAllocationHandler、ExecutorRunner这四个类。
1、Client作为Yarn的客户端,负责向Yarn发送启动ApplicationMaster的命令。
2、ApplicationMaster就像项目经理一样负责整个项目所需要的工作,包括请求资源,分配资源,启动Driver和Executor,Executor启动失败的错误处理。
3、ApplicationMaster的请求、分配资源是通过YarnAllocationHandler来进行的。
4、Container选择的顺序是:优先选择机器 > 同一个rack的机器 > 任意机器。
5、ExecutorRunner只负责向Container发送启动CoarseGrainedExecutorBackend的命令。
6、Executor的错误处理是在ApplicationMaster的launchReporterThread方法里面,它启动的线程除了报告运行状态,还会监控Executor的运行,一旦发现有丢失的Executor就重新请求。
7、在yarn目录下看到的名称里面带有YarnClient的是属于yarn-client模式的类,实现和前面的也差不多。
其它的内容更多是Yarn的客户端api使用,我也不太会,只是看到了能懂个意思,