6.1.3 初始化作业

在客户端用户作业调用JobTracker对象的submitJob()方法后,JobTracker会把此调用放入内部的TaskScheduler变量中,然后进行调度,默认的调度方法是JobQueueTaskScheduler,也就是FIFO调度方式。当客户作业被调度执行时,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装到这个对象中,以便跟踪任务的状态和进程。接下来JobInProgress对象的initTasks函数会对任务进行初始化操作(见图6-1的步骤⑤)。下面仍然从initTasks函数的代码出发详细讲解初始化过程。


public synchronized void initTasks()throws IOException{

……

//从HDFS中作业对应的路径读取job.split文件,生成input

//splits为下面Map的划分做好准备

TaskSplitMetaInfo[]splits=createSplits(jobId);

//根据input split设置Map Task个数

numMapTasks=splits.length;

for(TaskSplitMetaInfo split:splits){

NetUtils.verifyHostnames(split.getLocations());}

//为每个Map Tasks生成一个TaskInProgress来处理一个input split

maps=new TaskInProgress[numMapTasks];

for(int i=0;i<numMapTasks;++i){

inputLength+=splits[i].getInputDataLength();

maps[i]=new TaskInProgress(jobId, jobFile, splits[i],jobtracker, conf,

this, i,numSlotsPerMap);}

if(numMapTasks>0){

//map task放入nonRunningMapCache,其将在JobTracker向

//TaskTracker分配Map Task的时候使用

nonRunningMapCache=createCache(splits, maxLevel);

}

//创建Reduce Task

this.reduces=new TaskInProgress[numReduceTasks];

for(int i=0;i<numReduceTasks;i++){

reduces[i]=new TaskInProgress(jobId, jobFile, numMapTasks, i,jobtracker,

conf, this, numSlotsPerReduce);

//Reduce Task放入nonRunningReduces,其将在JobTracker向

//TaskTracker分配Reduce Task的时候使用

nonRunningReduces.add(reduces[i]);

}

//清理Map和Reduce

cleanup=new TaskInProgress[2];

TaskSplitMetaInfo emptySplit=JobSplit.EMPTY_TASK_SPLIT;

cleanup[0]=new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf,

this, numMapTasks);

cleanup[0].setJobCleanupTask();

cleanup[1]=new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks,

jobtracker, conf, this,1);

cleanup[1].setJobCleanupTask();

//创建两个初始化Task,一个初始化Map,一个初始化Reduce

setup=new TaskInProgress[2];

setup[0]=new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf,

this, numMapTasks+1,1);

setup[0].setJobSetupTask();

setup[1]=new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks+1,

jobtracker, conf, this,1);

setup[1].setJobSetupTask();

tasksInited=true;//初始化完毕

……

}


从上面的代码可以看出初始化过程主要有以下步骤:

1)从HDFS中读取作业对应的job.split(见图6-1的步骤⑥)。JobTracker从HDFS中作业对应的路径获取JobClient在步骤③中写入的job.split文件,得到输入数据的划分信息,为后面初始化过程中Map任务的分配做好准备。

2)创建并初始化Map任务和Reduce任务。initTasks先根据输入数据划分信息中的个数设定Map Task的个数,然后为每个Map Task生成一个TaskInProgress来处理input split,并将Map Task放入nonRunningMapCache,以便在JobTracker向TaskTracker分配Map Task的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法来设置reduce task的个数,然后采用类似Map Task的方式将Reduce Task放入nonRunningReduces中,以便向TaskTracker分配Reduce Task时使用。

3)最后就是创建两个初始化Task,根据个数和输入划分已经配置的信息,并分别初始化Map和Reduce。