MapReduce的调度可以分为两个部分:一是初始化部分,二是任务调度过程。下面结合上图来阐述这两个过程:
初始化部分
1.Masterreceives submitted tasks, and initializes it as map and reduce tasks;
2. After initialization, put the task into the pending task list;
3. Calculated local task list for each worker node;
4. Waiting for the worker to request tasks.
1)Master接收到客户端提交的作业,将其初始化为map task和reduce task.具体来讲:
JobClient将job通过JobSubmitter.submitJobInternal(job,cluster)提交给JobTracker,函数submitJobInternal会进行很多初始化的工作,检查Job的输入输出文件路径的格式、为作业计算输入切分块,建立作业统计信息,拷贝job.jar和配置文件到HDFS,最后提交到JobTracker并监视状态。
JobTracker是Cluster默认的接口ClientProtocol的一个实现,这个工作在Hadoop JobTracker启动时就会完成。很多朋友看到
status = submitClient.submitJob(
会找到发现submitClient 是ClientProtocol就再也跟不下去了,其实,在JobSubmitter的构造函数中,
JobSubmitter(FileSystem submitFs, ClientProtocol submitClient) ,通过Cluster.getClient()即可获得对应JobTracker的实例。
JobTracker.submitJob() -> JobTracker.addJob() ->通过JobInProgressListener,将作业添加给监听器,获得数据的情况。然后在JobTrackerMetricsInst上修改提交更新数据情况。
Jobtracker根据反射机制获取conf中定义的调度器,实现在Jobtracker的注册。由于Hadoop的作业调度模块使用了组件式开发,因此,TaskScheduler是其公共的接口,不同的调度组件需要实现该接口的内容。在默认的情况下,conf定义的调度类为JobQueueTaskScheduler,该类使用了FIFO的调度策略。在该类中添加了两个JobInProgressListener,分别为JobQueueJobInProgressLis
TaskTrackerManager是一个接口,JobTracker实现了这个接口,并在JobTracker启动的时候,将自身通过setTaskTrackerManager()设置初始化TaskTrackerManager。代码如下所示:
startTracker(**) {
...
result = new JobTracker(conf, clock, identifier);
result.taskScheduler.setTaskTrackerManager(result);//这里的taskScheduler已经在构造函数中指定到一种具体的实现,默认是JobQueueTaskScheduler
...
}
这样EagerTaskInitializationL
public synchronized void initTasks()
.....
.....
//向Jobtracker汇报mapTask 和reduceTask的个数。
}
自此,一个job提交后,初始化为MapTasks 和ReduceTasks,并在等待执行。
下面就是TaskTracker通过Heartbeat向JobTracker请求执行task,然后JobTracker根据TaskScheduler提供的assignTask将task部署到各个TaskTracker上去执行,这部分的内容也很多,请大家等待新的解读。