Hadoop调度源码分析 作业提交到完成初始化部分

时间:2021-05-02 14:33:38

MapReduce的调度可以分为两个部分:一是初始化部分,二是任务调度过程。下面结合上图来阐述这两个过程:
初始化部分
1.Masterreceives submitted tasks, and initializes it as map and reduce tasks;

2. After initialization, put the task into the pending task list;

3. Calculated local task list for each worker node;

4. Waiting for the worker to request tasks.


1)Master接收到客户端提交的作业,将其初始化为map task和reduce task.具体来讲:
JobClient将job通过JobSubmitter.submitJobInternal(job,cluster)提交给JobTracker,函数submitJobInternal会进行很多初始化的工作,检查Job的输入输出文件路径的格式、为作业计算输入切分块,建立作业统计信息,拷贝job.jar和配置文件到HDFS,最后提交到JobTracker并监视状态。
JobTracker是Cluster默认的接口ClientProtocol的一个实现,这个工作在Hadoop JobTracker启动时就会完成。很多朋友看到
status = submitClient.submitJob(
jobId, submitJobDir.toString(), TokenCache.getTokenStorage());
会找到发现submitClient 是ClientProtocol就再也跟不下去了,其实,在JobSubmitter的构造函数中,
JobSubmitter(FileSystem submitFs, ClientProtocol submitClient) ,通过Cluster.getClient()即可获得对应JobTracker的实例。
JobTracker.submitJob() -> JobTracker.addJob() ->通过JobInProgressListener,将作业添加给监听器,获得数据的情况。然后在JobTrackerMetricsInst上修改提交更新数据情况。
Jobtracker根据反射机制获取conf中定义的调度器,实现在Jobtracker的注册。由于Hadoop的作业调度模块使用了组件式开发,因此,TaskScheduler是其公共的接口,不同的调度组件需要实现该接口的内容。在默认的情况下,conf定义的调度类为JobQueueTaskScheduler,该类使用了FIFO的调度策略。在该类中添加了两个JobInProgressListener,分别为JobQueueJobInProgressLis tener和EagerTaskInitializationL istener两个作业生命周期状态变化的监听类。前者主要负责作业在队列信息的维护,后者主要负责作业内task的初始化的工作。

TaskTrackerManager是一个接口,JobTracker实现了这个接口,并在JobTracker启动的时候,将自身通过setTaskTrackerManager()设置初始化TaskTrackerManager。代码如下所示:
startTracker(**) {
...
result = new JobTracker(conf, clock, identifier);
result.taskScheduler.setTaskTrackerManager(result);//这里的taskScheduler已经在构造函数中指定到一种具体的实现,默认是JobQueueTaskScheduler
...
}
这样EagerTaskInitializationL istener在提交作业的时候,会jobAdd接口会直接调用TaskTrackerManager.initJob() 实际是Jobtracker.initJob(JobInProgress **) =》 JobInProgress.initTasks(),这个初始化工作是异步进行,不会block其它作业的执行,该方法主要执行Map task 和 Reduce task 的初始化工作,关键的部分列出:
public synchronized void initTasks()
throws IOException, KillInterruptedException {
.....
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
numMapTasks = taskSplitMetaInfo.length;//计算map节点的个数
.....
jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
//向Jobtracker汇报mapTask 和reduceTask的个数。
}
自此,一个job提交后,初始化为MapTasks 和ReduceTasks,并在等待执行。

下面就是TaskTracker通过Heartbeat向JobTracker请求执行task,然后JobTracker根据TaskScheduler提供的assignTask将task部署到各个TaskTracker上去执行,这部分的内容也很多,请大家等待新的解读。