7.3 心跳机制

7.3.1 单次心跳发送

第6章中已经提到,JobTracker与TaskTracker之间采用了pull通信模型,即JobTracker从不会主动与TaskTracker通信,而总是被动等待TaskTracker汇报信息并领取其对应的命令。TaskTracker周期性地向JobTracker汇报信息并领取任务形成心跳。

在TaskTracker类的run方法中维护了一个无限循环,用于通过心跳发送状态信息和接收命令,代码框架如下:


public void run(){

……

while(running&&!shuttingDown&&!denied){

……

while(running&&!staleState&&!shuttingDown&&!denied){

……

State osState=offerService();

……

}

}

}


其中,offerService函数代码框架如下:


State offerService()throws Exception{

……

while(running&&!shuttingDown){

//判断是否到达心跳发送时间

……

//发送心跳

HeartbeatResponse heartbeatResponse=transmitHeartBeat(now);

//执行心跳响应heartbeatResponse中的各种命令

……

markUnresponsiveTasks();//杀死一定时间内未汇报进度的任务

killOverflowingTasks();//剩余磁盘空间小于mapred.local.dir.minspacekill时,寻找合

适的任务将其杀掉以释放空间

……

}

}


TaskTracker的单次心跳发送过程如图7-2所示,可分为以下几个步骤。

7.3 心跳机制 - 图1

图 7-2 TaskTracker单次心跳发送过程

步骤1 判断是否到达心跳发送时间。

TaskTracker的心跳间隔由集群规模和任务运行情况共同决定。

1)集群规模:JobTracker能够根据当前集群规模(TaskTracker数量)动态调整TaskTracker的心跳间隔,并将下一次心跳间隔放到TaskTracker的本次心跳应答中。

2)任务运行情况:为了提高任务的响应时间和资源利用率,TaskTracker一旦发现存在某个任务运行完成或者失败,就会立即缩短心跳间隔,以便将任务完成或失败的消息告诉JobTracker,进而快速重新分配任务,我们将这种特殊的心跳称为“带外心跳”。TaskTracker包含以下两个配置参数设置带外心跳。

❑mapreduce. tasktracker.outofband.heartbeat:决定是否启用带外心跳机制。默认值是false,表示不启用。

❑mapreduce. tasktracker.outofband.heartbeat.damper:心跳收缩因子。默认值是1 000 000。当启用带外心跳机制时,如果某时刻有X个任务运行完成,则心跳间隔变为heartbeatInterval/(X*oobHeartbeatDamper+1)

其中,heartbeatInterval是从JobTracker端获取的心跳间隔;oobHeartbeatDamper是心跳收缩因子(mapreduce.tasktracker.outofband.heartbeat.damper对应的值)。

步骤2 如果TaskTracker刚启动,则需要检查代码编译版本与JobTracker是否一致。

只有与JobTracker具有相同代码编译版本号的TaskTracker才能够向JobTracker发送心跳。代码编译版本号由Hadoop版本号、修订版本号、代码编译用户和校验和四部分组成,如“1.0.0-dev from 451451 by dongxicheng source checksum e54b3f6cb07ea1cd83 3d1ab0b947ac39”,其中,“1.0.0-dev”表示Hadoop版本号,“451451”表示修订版本号,“dongxicheng”表示代码编译用户,“e54b3f6cb07ea1cd833d1ab0b947ac39”表示校验和。

步骤3 检查是否有磁盘损坏。

MapReduce计算过程中最重要的输出目录是参数mapred.local.dir指定的中间结果存放目录(通常由多个目录组成,每个目录对应一个磁盘块)。由于这些目录存放在本地磁盘且没有备份,因此一旦损坏或者丢失后,需重新计算。TaskTracker初始化时会检查mapred.local.dir指定的磁盘目录列表,并将正常目录存放起来,之后,TaskTracker周期性(时间间隔由mapred.disk.healthChecker.interval指定,默认是60 s)检查这些正常目录,如果发现出现故障的目录,则TaskTracker会重新对自己初始化。

步骤4 发送心跳。

TaskTracker将当前节点运行时信息,比如资源使用情况、任务运行状态等,通过心跳汇报给JobTracker,同时接收来自JobTracker的各种命令。

步骤5 接收并执行命令。

JobTracker收到TaskTracker的心跳信息后,会为之下达命令。

接下来我们重点分析发送心跳和命令执行两个过程。