这次讲的是Executor,啥是Executor呢?Executor是干什么的?
当我们在生产环境中提交spark作业时,用spark-submit shell脚本里,往往需要指定一个参数:--num-executors,你可以指定为3,5。。。这难道是随便指定的?爱设多少就设多少?很显然,答案是no,首先你得根据你集群的资源情况来设定。那我们就有必要来弄清楚这个是什么。Executor是Worker节点上的进程,用来执行程序任务的。咳咳,注意了,下面开始画今天的重点:
1.怎么启动Executor进程的;
2.Executor是怎么进行任务调度的。
首先,来个概览图,以下我们讲的都是Standalone模式。1.CoarseGrainedExecutorBackend线程起来后会向driver发RegisteredExecutor消息,告诉driver我这边即将要起Executor进程啦,好让driver上有记录在哪个worker上起了Executor进程;driver注册成功后,会返回RegisteredExecutor消息,告诉说你可以起Executor了;
2.CoarseGrainedExecutorBackend线程接受到RegisteredExecutor消息,就会创建executor对象;
3.Driver内taskScheduler会向CoarseGrainedExecutorBackend线程发送lauchTask消息,就是调用executor的lauchTask方法启动一个TaskRunner线程,这个就是更细粒度的,其实真正执行任务的就是这些线程啦;
4.然后每来一个task就从线程池里拿出一个线程运行。
可以先稍微整理一下,感兴趣的可以继续往下看,从源码角度一步步跟踪下去。
启动Executor的流程图
首先我们从sparkContext在初始化的时候,记不记得有个taskScheduler.start(), 让我们从这里开始
然后我们继续进入到这个start()方法里
第一部分都是一些配置参数,这些配置参数都是要启动Executor所需要的一些参数,像driver所在的地址啊,worker所在的地址啊,要启动的Executor的核数啊,等等等
第二部分(分开截图,因为我电脑屏幕原因,只能把这个start()方法分成两部分)
进入client的start()方法里:
让我们看看这个AppClient还有一个生命周期方法onStart()
然后我们进入Master里:
接下去进入schedule()方法里,让我们看Master是怎么进行调度的
这时候Master就会向Worker发送消息,说你该起executor啦,并把这些信息封装发给Worker,接下来让我来看看Worker:
重点是new了一个executorRunner,并且start()
起了一个线程,然后放到线程池里,具体细节就不在往下跟了。
我们再回到CoarseGrainedExecutorBackend这类中,从CoarseGrainedExecutorBackend这个类的main方法入手,里面有个run方法:
在run方法里创建了一个CoarseGrainedExecutorBackend对象,用于与DriverActor进行通信的
接着会调用CoarseGrainedExecutorBackend的生命周期方法onStart,向driverActor注册
CoarseGrainedExecutorBackend收到driverActor的反馈信息后就创建Executor对象
欢迎关注个人微信公众号:BigData共享
文章来自:
https://mp.weixin.qq.com/s?__biz=MzU2NzA3OTEwMg==&mid=2247483746&idx=1&sn=d79585a5f7b040348a622c19311dd6f2&chksm=fca3f8b3cbd471a585afbed49f8a6685fb14243568d948ff1affc1e9adc0deb455379ddf32a4#rd