6.3 心跳接收与应答

心跳是沟通TaskTracker与JobTracker的桥梁,它实际上是一个RPC函数。TaskTracker周期性地调用该函数汇报节点和任务状态信息,从而形成心跳。在Hadoop中,心跳主要有三个作用:

❑判断TaskTracker是否活着。

❑及时让JobTracker获取各个节点上的资源使用情况和任务运行状态。

❑为TaskTracker分配任务。

注意 JobTracker与TaskTracker之间采用了“pull”而不是“push”模型,即JobTracker从不会主动向TaskTracker发送任何信息,而是由TaskTracker主动通过心跳“领取”属于自己的信息。JobTracker只能通过心跳应答的形式为各个TaskTracker分配任务。

TaskTracker周期性地调用RPC函数heartbeat向JobTracker汇报信息和领取任务。该函数定义如下:


public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted,

boolean initialContact,

boolean acceptNewTasks,

short responseId)


该函数的各个参数含义如下。

❑Status:该参数封装了TaskTracker上的各种状态信息,包括


String trackerName;//TaskTracker名称,形式如tracker_mymachine:localhost.

localdomain/127.0.0.1:34196

String host;//TaskTracker主机名

int httpPort;//TaskTracker对外的HTTP端口号

int failures;//该TaskTracker上已经失败的任务总数

List<TaskStatus>taskReports;//正在运行的各个任务运行状态

volatile long lastSeen;//上次汇报心跳的时间

private int maxMapTasks;/*Map slot总数,即允许同时运行的Map Task总数,由参数mapred.

tasktracker.map.tasks.maximum设定*/

private int maxReduceTasks;//Reduce slot总数

private TaskTrackerHealthStatus healthStatus;//TaskTracker健康状态

private ResourceStatus resStatus;//TaskTracker资源(内存,CPU等)信息


❑Restarted:表示TaskTracker是否刚刚重新启动。

❑initialContact:表示TaskTracker是否初次连接JobTracker。

❑acceptNewTasks:表示TaskTracker是否可以接收新任务,这通常取决于slot是否有剩余和节点健康状况等。

❑responseId:表示心跳响应编号,用于防止重复发送心跳。每接收一次心跳后,该值加1。

该函数的返回值为一个HeartbeatResponse对象,该对象主要封装了JobTracker向TaskTracker下达的命令,具体如下:


class HeartbeatResponse implements Writable, Configurable{

……

short responseId;//心跳响应编号

int heartbeatInterval;//下次心跳的发送间隔

TaskTrackerAction[]actions;/*来自JobTracker的命令,可能包括杀死作业、杀死任务、提

交任务、运行任务等,具体参考6.3.2节*/

Set<JobID>recoveredJobs=new HashSet<JobID>();//恢复完成的作业列表,具体参考

6.2.4节

……

}


该函数的内部实现逻辑主要分为两个步骤:更新状态和下达命令。JobTracker首先将TaskTracker汇报的最新任务运行状态保存到相应数据结构中,然后根据这些状态信息和外界需求(比如用户杀死一个作业)为其下达相应的命令。

6.3.1 更新状态

函数heartbeat首先会更新TaskTracker/Job/Task的状态信息。相关代码如下:


/检查是否允许该TaskTracker连接JobTracker。当一个TaskTracker在host list(由参数mapred.hosts指定)中,但不在exclude list(由参数mapred.hosts.exclude指定)中时,可接入JobTracker/

if(!acceptTaskTracker(status)){

throw new DisallowedTaskTrackerException(status);

}

……

/如果该TaskTracker被重启了,则将之标注为健康的TaskTracker,并从黑名单或者灰名单中清除(关于黑名单与灰名单的介绍,参考6.5.2节),否则,启动TaskTracker容错机制以检查它是否处于健康状态/

if(restarted){

faultyTrackers.markTrackerHealthy(status.getHost());

}else{

faultyTrackers.checkTrackerFaultTimeout(status.getHost(),now);

}

……

short newResponseId=(short)(responseId+1);//响应编号加1

/*记录心跳发送时间,以发现在一定时间内未发送心跳的TaskTracker,并将之标注为死亡的

TaskTracker,此后不可再向其分配新任务*/

status.setLastSeen(now);

if(!processHeartbeat(status, initialContact, now)){//处理心跳

……


接下来,跟踪进入函数processHeartbeat内部。该函数首先进行一系列异常情况检查,然后调用以下两个函数更新TaskTracker/Job/Task的状态信息:


updateTaskStatuses(trackerStatus);//更新Task状态信息

updateNodeHealthStatus(trackerStatus, timeStamp);//更新节点健康状态