6.1.4 分配任务

在前面的介绍中我们已经知道,TaskTracker和JobTracker之间的通信和任务的分配是通过心跳机制完成的。TaskTracker作为一个单独的JVM执行一个简单的循环,主要实现每隔一段时间向JobTracker发送心跳(Heartbeat):告诉JobTracker此TaskTracker是否存活,是否准备执行新的任务。JobTracker接收到心跳信息,如果有待分配任务,它就会为TaskTracker分配一个任务,并将分配信息封装在心跳通信的返回值中返回给TaskTracker。TaskTracker从心跳方法的Response中得知此TaskTracker需要做的事情,如果是一个新的Task则将它加入本机的任务队列中(见图6-1的步骤⑦)。

下面从TaskTracker中的transmitHeartBeat()方法和JobTracker中的heartbeat()方法的主要代码出发,介绍任务分配的详细过程,以及在此过程中TaskTracker和JobTracker的通信。

TaskTracker中transmitHeartBeat()方法的主要代码:


//向JobTracker报告TaskTracker的当前状态

if(status==null){

synchronized(this){

status=new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, cloneAndRe

setRunningTaskStatuses(sendCounters),failures, maxMapSlots, maxReduceSlots);

}

}

……

//根据条件是否满足来确定此TaskTracker是否请求JobTracker

//为其分配新的Task

boolean askForNewTask;

long localMinSpaceStart;

synchronized(this){

askForNewTask=(status.countMapTasks()<maxCurrentMapTasks||

status.countReduceTasks()<maxCurrentReduceTasks)&&acceptNewTasks;

localMinSpaceStart=minSpaceStart;

}

……

//向JobTracker发送heartbeat

HeartbeatResponse heartbeatResponse=jobClient.heartbeat(status, justStarted,

justInited, askForNewTask, heartbeatResponseId);

……

JobTracker中heartbeat()方法的主要代码:

……

String trackerName=status.getTrackerName();

……

//如果TaskTracker向JobTracker请求一个Task运行

if(recoveryManager.shouldSchedule()&&acceptNewTasks&&!isBlacklisted){

TaskTrackerStatus taskTrackerStatus=getTaskTracker(trackerName);

if(taskTrackerStatus==null){

LOG.warn("Unknown task tracker polling;ignoring:"+trackerName);

}else{

List<Task>tasks=getSetupAndCleanupTasks(taskTrackerStatus);

if(tasks==null){

//任务调度器分配任务

tasks=taskScheduler.assignTasks(taskTrackers.get(trackerName));

}

if(tasks!=null){

for(Task task:tasks){

//将任务返回给TaskTracker

expireLaunchingTasks.addNewTask(task.getTaskID());

actions.add(new LaunchTaskAction(task));

}}}}……


上面两段代码展示了TaskTracker和JobTracker之间通过心跳通信汇报状态与分配任务的详细过程。TaskTracker首先发送自己的状态(主要是Map任务和Reduce任务的个数是否小于上限),并根据自身条件选择是否向JobTracker请求新的Task,最后发送心跳。JobTracker接收到TaskTracker的心跳后首先分析心跳信息,如果发现TaskTracker在请求一个Task,那么任务调度器就会将任务和任务信息封装起来返回给TaskTracker。

针对Map任务和Reduce任务,TaskTracker有固定数量的任务槽(Map任务和Reduce任务的个数都有上限)。当TaskTracker从JobTracker返回的心跳信息中获取新的任务信息时,它会将Map任务或者Reduce任务加入对应的任务槽中。需要注意的是,在JobTracker为TaskTracker分配Map任务时,为了减小网络带宽,会考虑将map任务数据本地化。它会根据TaskTracker的网络位置,选取一个距离此TaskTracker map任务最近的输入划分文件分配给此TaskTracker。最好的情况是,划分文件就在TaskTracker本地(TaskTracker往往是运行在HDFS的DataNode中,所以这种情况是存在的)。