作业初始化
mapreduce源码分析作业提交、初始化、分配、计算过程之提交篇最后讲到Client远程RPC调用Jobtracker的submitJob方法,mapreduce作业初始化就从此处作为切入口。
jobtracker 里的submitJob方法:
public synchronized JobStatus submitJob(JobID jobId) throws IOException {
if(jobs.containsKey(jobId)) {
//job already running, don't start twice
return jobs.get(jobId).getStatus();
}
JobInProgress job = new JobInProgress(jobId, this, this.conf); //创建JobInprogress对象维护作业的运行时信息
String queue = job.getProfile().getQueueName();
if(!(queueManager.getQueues().contains(queue))) {
new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
throw new IOException("Queue \"" + queue + "\" does not exist");
}
// check for access //检查用户是否有指定队列作业提交权限
try {
checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
} catch (IOException ioe) {
LOG.warn("Access denied for user " + job.getJobConf().getUser()
+ ". Ignoring job " + jobId, ioe);
new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
throw ioe;
}
// Check the job if it cannot run in the cluster because of invalid memory
// requirements.
try {
checkMemoryRequirements(job); //检查作业配置的内存是否配置合理,用户提交作业时可用//mapred.job.map.memory.mb mapred.job.reduce.memory.mb指定map,reduce占用的内存量,管理员可能过参数mapred.cluster.max.map.memory.mb, mapred.cluster.max.reduce.memory.mb配置用户最大内存使用量,一旦超过,则作业提交失败
} catch (IOException ioe) {
new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
throw ioe;
}
return addJob(jobId, job); //通知taskscheduler将作业加入作业列队,同时初始化作业 }
jobtracker中listeners列表执行add方法,触发到eagerTaskInitializationListener类去执行init(wait notify方法)-调用jobtracker的initJob-再调用JobInprogress的initTasks方法-在JobProgress内会创建所有的map,reduce对应的taskprogress放在缓存里nonRunningMapCache....
addJob方法内:
private synchronized JobStatus addJob(JobID jobId, JobInProgress job) {
totalSubmissions++;
synchronized (jobs) {
synchronized (taskScheduler) {
jobs.put(job.getProfile().getJobID(), job);
/*
观察者设计模式
jobInProgressListeners为监听对象,存储为一系列的待通知对象 ,private final List<JobInProgressListener> jobInProgressListeners = new CopyOnWriteArrayList<JobInProgressListener>();
jobtracker内有jobInProgressListeners初始化方法addJobInProgressListener
*/
for (JobInProgressListener listener : jobInProgressListeners) {
try {
listener.jobAdded(job); //调用listener就会调用taskscheduler(JobQueueTaskScheduler)的相关方法,所谓观察者模式
} catch (IOException ioe) {
LOG.warn("Failed to add and so skipping the job : "
+ job.getJobID() + ". Exception : " + ioe);
}
}
}
}
myInstrumentation.submitJob(job.getJobConf(), jobId);
return job.getStatus();
}
listener.jobAdded即是调用JobQueueJobInProgressListener的jobAdded方法,那什么时候把JobQueueJobInProgressListener对象赋给jobtracker的呢?是在taskscheduler启动时候,那taskscheduler什么时候启动呢,那是在jobtracker start初始化的时候会根据mapred.jobtracker.taskScheduler构造相应的调度器
JobQueueTaskScheduler调度器的启动
jobQueueJobInProgressListener第一个增加到jobtracker里的监听列表里,所以作业增加在作业初始化前面
public synchronized void start() throws IOException {
super.start();
taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
//taskTrackerManager实际是jobtracker的对象,向jobtracker注册jobQueueJobInProgressListener对象,所以上面的listener.jobAdded(job);调用的方法,调的是jobQueueJobInProgressListener的方法
eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
eagerTaskInitializationListener.start();
taskTrackerManager.addJobInProgressListener(
eagerTaskInitializationListener); //eagerTaskInitializationListener会初始化作业,调用jobtracker中的initJob方法,initJob方法会调用JobInprogress的initTask方法
}
jobQueueJobInProgressListener的方法
public void jobAdded(JobInProgress job) {
jobQueue.put(new JobSchedulingInfo(job.getStatus()), job);
}
上边的调度器是jobtracker启动的时候构建的
public static JobTracker startTracker(JobConf conf, String identifier)
throws IOException, InterruptedException {
JobTracker result = null;
while (true) {
try {
result = new JobTracker(conf, identifier); //jobtracker构造方法内会根据参数mapred.jobtracker.taskScheduler创建相应的调度器
result.taskScheduler.setTaskTrackerManager(result); //将jobtracker实例传给taskScheduler
jobtracker内main方法会调用startTracker
public static void main(String argv[]
) throws IOException, InterruptedException {
.... try {
JobTracker tracker = startTracker(new JobConf());
tracker.offerService(); //启动各个线程,interTrackerServer为ipc.Server类的对象(有内部类Listener,以nio 的方式启动守护进程,启动 infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort,
tmpInfoPort == 0, conf); HttpServer( org.apache.hadoop.http)
HttpServer的作用
* Create a Jetty embedded server to answer http requests. The primary goal
* is to serve up status information for the server
* There are three contexts:
* "/logs/" -> points to the log directory
* "/static/" -> points to common static files (src/webapps/static)
* "/" -> the jsp server code from (src/webapps/<name>)
作业的初始化
调度器:
Listener类都会加到jobtracker类,jobtracker的submitjob会执行Listener的addjob方法
eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
eagerTaskInitializationListener.start();
taskTrackerManager.addJobInProgressListener(
eagerTaskInitializationListener); //eagerTaskInitializationListener会初始化作业,调用jobtracker中的initJob方法,initJob方法会调用JobInprogress的initTask方法
细节: jobAdded会触发initJob
EagerTaskInitializationListener类:
....
class JobInitManager implements Runnable {
public void run() {
JobInProgress job = null;
while (true) {
try {
synchronized (jobInitQueue) {
while (jobInitQueue.isEmpty()) {
jobInitQueue.wait();
}
job = jobInitQueue.remove(0);
}
threadPool.execute(new InitJob(job));
} catch (InterruptedException t) {
LOG.info("JobInitManagerThread interrupted.");
break;
}
}
....
class InitJob implements Runnable {
private JobInProgress job;
public InitJob(JobInProgress job) {
this.job = job;
}
public void run() {
ttm.initJob(job); //此处的ttm即是jobtracker
}
}
....
public void jobAdded(JobInProgress job) {
synchronized (jobInitQueue) {
jobInitQueue.add(job);
resortInitQueue();
jobInitQueue.notifyAll();
}
}
JobInprogress的initTask方法创建一系列taskinprogress缓存起来,等待任务分配发回到tasktracker
/**
* Construct the splits, etc. This is invoked from an async
* thread so that split-computation doesn't block anyone.
*/
public synchronized void initTasks()
throws IOException, KillInterruptedException {
if (tasksInited.get() || isComplete()) {
return;
}
synchronized(jobInitKillStatus){
if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
return;
}
jobInitKillStatus.initStarted = true;
}
LOG.info("Initializing " + jobId);
// log job info
JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(),
this.startTime, hasRestarted());
// log the job priority
setPriority(this.priority);
//
// read input splits and create a map per a split
//
String jobFile = profile.getJobFile();
Path sysDir = new Path(this.jobtracker.getSystemDir());
FileSystem fs = sysDir.getFileSystem(conf);
DataInputStream splitFile =
fs.open(new Path(conf.get("mapred.job.split.file")));
JobClient.RawSplit[] splits;
try {
splits = JobClient.readSplitFile(splitFile);
} finally {
splitFile.close();
}
numMapTasks = splits.length;
// if the number of splits is larger than a configured value
// then fail the job.
int maxTasks = jobtracker.getMaxTasksPerJob();
if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
throw new IOException(
"The number of tasks for this job " +
(numMapTasks + numReduceTasks) +
" exceeds the configured limit " + maxTasks);
}
jobtracker.getInstrumentation().addWaiting(
getJobID(), numMapTasks + numReduceTasks);
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i);
}
LOG.info("Input size for job " + jobId + " = " + inputLength
+ ". Number of splits = " + splits.length);
if (numMapTasks > 0) {
nonRunningMapCache = createCache(splits, maxLevel);
}
// set the launch time
this.launchTime = System.currentTimeMillis();
//
// Create reduce tasks
//
this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
reduces[i] = new TaskInProgress(jobId, jobFile,
numMapTasks, i,
jobtracker, conf, this);
nonRunningReduces.add(reduces[i]);
}
// Calculate the minimum number of maps to be complete before
// we should start scheduling reduces
completedMapsForReduceSlowstart =
(int)Math.ceil(
(conf.getFloat("mapred.reduce.slowstart.completed.maps",
DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
numMapTasks));
// create cleanup two cleanup tips, one map and one reduce.
cleanup = new TaskInProgress[2];
// cleanup map tip. This map doesn't use any splits. Just assign an empty
// split.
JobClient.RawSplit emptySplit = new JobClient.RawSplit();
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks);
cleanup[0].setJobCleanupTask();
// cleanup reduce tip.
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this);
cleanup[1].setJobCleanupTask();
// create two setup tips, one map and one reduce.
setup = new TaskInProgress[2];
// setup map tip. This map doesn't use any split. Just assign an empty
// split.
setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks + 1 );
setup[0].setJobSetupTask();
// setup reduce tip.
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this);
setup[1].setJobSetupTask();
synchronized(jobInitKillStatus){
jobInitKillStatus.initDone = true;
if(jobInitKillStatus.killed) {
throw new KillInterruptedException("Job " + jobId + " killed in init");
}
}
tasksInited.set(true);
JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
numMapTasks, numReduceTasks);
}