5.2.4 作业提交到JobTracker

JobClient最终调用RPC方法submitJob将作业提交到JobTracker端,在JobTracker.submitJob中,会依次进行以下操作:

(1)为作业创建JobInProgress对象

JobTracker会为每个作业创建一个JobInProgress对象。该对象维护了作业的运行时信息。它在作业运行过程中一直存在,主要用于跟踪正在运行作业的运行状态和进度。

(2)检查用户是否具有指定队列的作业提交权限

Hadoop以队列为单位管理作业和资源,每个队列分配有一定量的资源,每个用户属于一个或者多个队列且只能使用所属队列中的资源。第4章中提到,管理员可为每个队列指定哪些用户具有作业提交权限和管理权限。

(3)检查作业配置的内存使用量是否合理

用户提交作业时,可分别用参数mapred.job.map.memory.mb和mapred.job.reduce.memory.mb指定Map Task和Reduce Task占用的内存量;而管理员可通过参数mapred.cluster.max.map.memory.mb和mapred.cluster.max.reduce.memory.mb限制用户配置的任务最大内存使用量,一旦用户配置的内存使用量超过系统限制,则作业提交失败。

(4)通知TaskScheduler初始化作业

JobTracker收到作业后,并不会马上对其初始化,而是交给调度器,由它按照一定的策略对作业进行初始化。之所以不选择JobTracker而让调度器初始化,主要考虑到以下两个原因:

❑作业一旦初始化后便会占用一定量的内存资源,为了防止大量初始化的作业排队等待调度而占用大量不必要的内存资源,Hadoop按照一定的策略选择性地初始化作业以节省内存资源;

❑任务调度器的职责是根据每个节点的资源使用情况对其分配最合适的任务,而只有经过初始化的作业才有可能得到调度,因而将作业初始化策略嵌到调度器中是一种比较合理的设计。

Hadoop的调度器是一个可插拔模块,用户可通过实现TaskScheduler接口设计自己的调度器。当前Hadoop默认的调度器是JobQueueTaskScheduler。它采用的调度策略是先来先服务(First In First Out, FIFO)。另外两个比较常用的调度器是Fair Scheduler和Capacity Scheduler,具体参考第10章。

JobTracker采用了观察者设计模式(也称为发布-订阅模式)将“提交新作业”这一事件告诉TaskScheduler,如图5-7所示。

5.2.4 作业提交到JobTracker - 图1

图 5-7 JobTracker采用观察者设计模式将作业变化(添加/删除/更新作业)通知TaskScheduler

JobTracker采用了观察者设计模式将“提交新作业”这一事件告诉TaskScheduler的相关代码如下:


private synchronized JobStatus addJob(JobID jobId, JobInProgress job)

throws IOException{

……

synchronized(jobs){

synchronized(taskScheduler){

jobs.put(job.getProfile().getJobID(),job);

for(JobInProgressListener listener:jobInProgressListeners){

listener.jobAdded(job);//依次通知每个已注册的JobInProgressListener

}

}

}

……

}


JobTracker启动时会根据配置参数mapred.jobtracker.taskScheduler构造相应的任务调度器,并调用它的start()方法进行初始化。在该方法中,调度器会向JobTracker注册JobInProgressListener对象以监听作业的添加/删除/更新等事件。以默认调度器JobQueueTaskScheduler为例,它的start()方法如下:


public synchronized void start()throws IOException{

super.start();

//此处的taskTrackerManager实际上就是JobTracker对象,向JobTracker注册一个

//JobQueueJobInProgressListener

taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);

eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);

eagerTaskInitializationListener.start();

//向JobTracker注册EagerTaskInitializationListener

taskTrackerManager.addJobInProgressListener(

eagerTaskInitializationListener);

}


在上面的代码中,JobQueueTaskScheduler向JobTracker注册了两个JobInProgress Listener:EagerTaskInitializationListener和JobQueueJobInProgressListener,它们分别用于作业初始化和作业排序(具体参考第10章)。需要注意的是,代码中的taskTrackerManager实际上就是JobTracker,其在JobTracker类中的相关代码如下:


public static JobTracker startTracker(JobConf conf, String identifier)

throws IOException, InterruptedException{

……

result=new JobTracker(conf, identifier);//创建唯一的JobTracker实例

result.taskScheduler.setTaskTrackerManager(result);//将JobTracker实例传递给TaskScheduler

……

}