6.1.3 初始化作业
在客户端用户作业调用JobTracker对象的submitJob()方法后,JobTracker会把此调用放入内部的TaskScheduler变量中,然后进行调度,默认的调度方法是JobQueueTaskScheduler,也就是FIFO调度方式。当客户作业被调度执行时,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装到这个对象中,以便跟踪任务的状态和进程。接下来JobInProgress对象的initTasks函数会对任务进行初始化操作(见图6-1的步骤⑤)。下面仍然从initTasks函数的代码出发详细讲解初始化过程。
public synchronized void initTasks()throws IOException{
……
//从HDFS中作业对应的路径读取job.split文件,生成input
//splits为下面Map的划分做好准备
TaskSplitMetaInfo[]splits=createSplits(jobId);
//根据input split设置Map Task个数
numMapTasks=splits.length;
for(TaskSplitMetaInfo split:splits){
NetUtils.verifyHostnames(split.getLocations());}
//为每个Map Tasks生成一个TaskInProgress来处理一个input split
maps=new TaskInProgress[numMapTasks];
for(int i=0;i<numMapTasks;++i){
inputLength+=splits[i].getInputDataLength();
maps[i]=new TaskInProgress(jobId, jobFile, splits[i],jobtracker, conf,
this, i,numSlotsPerMap);}
if(numMapTasks>0){
//map task放入nonRunningMapCache,其将在JobTracker向
//TaskTracker分配Map Task的时候使用
nonRunningMapCache=createCache(splits, maxLevel);
}
//创建Reduce Task
this.reduces=new TaskInProgress[numReduceTasks];
for(int i=0;i<numReduceTasks;i++){
reduces[i]=new TaskInProgress(jobId, jobFile, numMapTasks, i,jobtracker,
conf, this, numSlotsPerReduce);
//Reduce Task放入nonRunningReduces,其将在JobTracker向
//TaskTracker分配Reduce Task的时候使用
nonRunningReduces.add(reduces[i]);
}
//清理Map和Reduce
cleanup=new TaskInProgress[2];
TaskSplitMetaInfo emptySplit=JobSplit.EMPTY_TASK_SPLIT;
cleanup[0]=new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf,
this, numMapTasks);
cleanup[0].setJobCleanupTask();
cleanup[1]=new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks,
jobtracker, conf, this,1);
cleanup[1].setJobCleanupTask();
//创建两个初始化Task,一个初始化Map,一个初始化Reduce
setup=new TaskInProgress[2];
setup[0]=new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf,
this, numMapTasks+1,1);
setup[0].setJobSetupTask();
setup[1]=new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks+1,
jobtracker, conf, this,1);
setup[1].setJobSetupTask();
tasksInited=true;//初始化完毕
……
}
从上面的代码可以看出初始化过程主要有以下步骤:
1)从HDFS中读取作业对应的job.split(见图6-1的步骤⑥)。JobTracker从HDFS中作业对应的路径获取JobClient在步骤③中写入的job.split文件,得到输入数据的划分信息,为后面初始化过程中Map任务的分配做好准备。
2)创建并初始化Map任务和Reduce任务。initTasks先根据输入数据划分信息中的个数设定Map Task的个数,然后为每个Map Task生成一个TaskInProgress来处理input split,并将Map Task放入nonRunningMapCache,以便在JobTracker向TaskTracker分配Map Task的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法来设置reduce task的个数,然后采用类似Map Task的方式将Reduce Task放入nonRunningReduces中,以便向TaskTracker分配Reduce Task时使用。
3)最后就是创建两个初始化Task,根据个数和输入划分已经配置的信息,并分别初始化Map和Reduce。