6.3 心跳接收与应答
心跳是沟通TaskTracker与JobTracker的桥梁,它实际上是一个RPC函数。TaskTracker周期性地调用该函数汇报节点和任务状态信息,从而形成心跳。在Hadoop中,心跳主要有三个作用:
❑判断TaskTracker是否活着。
❑及时让JobTracker获取各个节点上的资源使用情况和任务运行状态。
❑为TaskTracker分配任务。
注意 JobTracker与TaskTracker之间采用了“pull”而不是“push”模型,即JobTracker从不会主动向TaskTracker发送任何信息,而是由TaskTracker主动通过心跳“领取”属于自己的信息。JobTracker只能通过心跳应答的形式为各个TaskTracker分配任务。
TaskTracker周期性地调用RPC函数heartbeat向JobTracker汇报信息和领取任务。该函数定义如下:
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted,
boolean initialContact,
boolean acceptNewTasks,
short responseId)
该函数的各个参数含义如下。
❑Status:该参数封装了TaskTracker上的各种状态信息,包括
String trackerName;//TaskTracker名称,形式如tracker_mymachine:localhost.
localdomain/127.0.0.1:34196
String host;//TaskTracker主机名
int httpPort;//TaskTracker对外的HTTP端口号
int failures;//该TaskTracker上已经失败的任务总数
List<TaskStatus>taskReports;//正在运行的各个任务运行状态
volatile long lastSeen;//上次汇报心跳的时间
private int maxMapTasks;/*Map slot总数,即允许同时运行的Map Task总数,由参数mapred.
tasktracker.map.tasks.maximum设定*/
private int maxReduceTasks;//Reduce slot总数
private TaskTrackerHealthStatus healthStatus;//TaskTracker健康状态
private ResourceStatus resStatus;//TaskTracker资源(内存,CPU等)信息
❑Restarted:表示TaskTracker是否刚刚重新启动。
❑initialContact:表示TaskTracker是否初次连接JobTracker。
❑acceptNewTasks:表示TaskTracker是否可以接收新任务,这通常取决于slot是否有剩余和节点健康状况等。
❑responseId:表示心跳响应编号,用于防止重复发送心跳。每接收一次心跳后,该值加1。
该函数的返回值为一个HeartbeatResponse对象,该对象主要封装了JobTracker向TaskTracker下达的命令,具体如下:
class HeartbeatResponse implements Writable, Configurable{
……
short responseId;//心跳响应编号
int heartbeatInterval;//下次心跳的发送间隔
TaskTrackerAction[]actions;/*来自JobTracker的命令,可能包括杀死作业、杀死任务、提
交任务、运行任务等,具体参考6.3.2节*/
Set<JobID>recoveredJobs=new HashSet<JobID>();//恢复完成的作业列表,具体参考
6.2.4节
……
}
该函数的内部实现逻辑主要分为两个步骤:更新状态和下达命令。JobTracker首先将TaskTracker汇报的最新任务运行状态保存到相应数据结构中,然后根据这些状态信息和外界需求(比如用户杀死一个作业)为其下达相应的命令。
6.3.1 更新状态
函数heartbeat首先会更新TaskTracker/Job/Task的状态信息。相关代码如下:
/检查是否允许该TaskTracker连接JobTracker。当一个TaskTracker在host list(由参数mapred.hosts指定)中,但不在exclude list(由参数mapred.hosts.exclude指定)中时,可接入JobTracker/
if(!acceptTaskTracker(status)){
throw new DisallowedTaskTrackerException(status);
}
……
/如果该TaskTracker被重启了,则将之标注为健康的TaskTracker,并从黑名单或者灰名单中清除(关于黑名单与灰名单的介绍,参考6.5.2节),否则,启动TaskTracker容错机制以检查它是否处于健康状态/
if(restarted){
faultyTrackers.markTrackerHealthy(status.getHost());
}else{
faultyTrackers.checkTrackerFaultTimeout(status.getHost(),now);
}
……
short newResponseId=(short)(responseId+1);//响应编号加1
/*记录心跳发送时间,以发现在一定时间内未发送心跳的TaskTracker,并将之标注为死亡的
TaskTracker,此后不可再向其分配新任务*/
status.setLastSeen(now);
if(!processHeartbeat(status, initialContact, now)){//处理心跳
……
接下来,跟踪进入函数processHeartbeat内部。该函数首先进行一系列异常情况检查,然后调用以下两个函数更新TaskTracker/Job/Task的状态信息:
updateTaskStatuses(trackerStatus);//更新Task状态信息
updateNodeHealthStatus(trackerStatus, timeStamp);//更新节点健康状态