6.3.2 下达命令
更新完状态信息后,JobTracker要为TaskTracker构造一个HeartbeatResponse对象作为心跳应答。该对象主要有两部分内容:下达给TaskTracker的命令和下次汇报心跳的时间间隔。下面分别对它们进行介绍。
1.下达命令
JobTracker将下达给TaskTracker的命令封装成TaskTrackerAction类,主要包括ReinitTrackerAction(重新初始化)、LaunchTaskAction(运行新任务)、KillTaskAction(杀死任务)、KillJobAction(杀死作业)和CommitTaskAction(提交任务)五种。下面依次对这几个命令进行介绍。
(1)ReinitTrackerAction
JobTracker收到TaskTracker发送过来的心跳信息后,首先要进行一致性检查,如果发现异常情况,则会要求TaskTracker重新对自己进行初始化,以恢复到一致的状态。当出现以下两种不一致情况时,JobTracker会向TaskTracker下达ReinitTrackerAction命令。
❑丢失上次心跳应答信息:JobTracker会保存向每个TaskTracker发送的最近心跳应答信息,如果JobTracker未刚刚重启且一个TaskTracker并非初次连接JobTracker(initialContact!=true),而最近的心跳应答信息丢失了,则这是一种不一致状态。
❑丢失TaskTracker状态信息:JobTracker接收到任何一个心跳信息后,会将TaskTracker状态(封装在类TaskTrackerStatus中)信息保存起来。如果一个TaskTracker非初次连接JobTracker但状态信息却不存在,则也是一种不一致状态。
(2)LaunchTaskAction
该类封装了TaskTracker新分配的任务。TaskTracker接收到该命令后会启动一个子进程运行该任务。Hadoop将一个作业分解后的任务分成两大类:计算型任务和辅助型任务。其中,计算型任务是处理实际数据的任务,包括Map Task和Reduce Task两种(对应TaskType类中的MAP和REDUCE两种类型),由专门的任务调度器对它们进行调度;而辅助型任务则不会处理实际的数据,通常用于同步计算型任务或者清理磁盘上无用的目录,包括job-setup task、job-cleanup task和task-cleanup task三种(对应TaskType类中的JOB_SETUP, JOB_CLEANUP和TASK_CLEANUP三种类型),其中,job-setup task和job-cleanup task分别用作计算型任务开始运行同步标识和结束运行同步标识,其具体特点已在第4章进行了介绍,而task-cleanup task则用于清理失败的计算型任务已经写到磁盘上的部分结果,这种任务由JobTracker负责调度,且运行优先级高于计算型任务。
如果一个正常(不在黑名单中)的TaskTracker尚有空闲slot(acceptNewTasks为true),则JobTracker会为该TaskTracker分配新任务,任务选择顺序是:先辅助型任务,再计算型任务。而对于辅助型任务,选择顺序依次为job-cleanup task、task-cleanup task和job-setup task,具体代码如下:
/*优先选择辅助型任务,选择优先级从高到低依次为:job-cleanup task、task-cleanup task和
job-setup task,这样可以让运行完成的作业快速结束,新提交的作业立刻进入运行状态*/
List<Task>tasks=getSetupAndCleanupTasks(taskTrackerStatus);
//如果没有辅助型任务,则选择计算型任务
if(tasks==null){
//由任务调度器选择一个或多个计算型任务
tasks=taskScheduler.assignTasks(taskTrackers.get(trackerName));
}
if(tasks!=null){
for(Task task:tasks){
expireLaunchingTasks.addNewTask(task.getTaskID());
//将分配的任务封装成LaunchTaskAction对象
actions.add(new LaunchTaskAction(task));
}
}
(3)KillTaskAction
该类封装了TaskTracker需杀死的任务。TaskTracker收到该命令后会杀掉对应任务、清理工作目录和释放slot。导致JobTracker向TaskTracker发送该命令的原因有很多,主要包括以下几个场景:
❑用户使用命令“bin/hadoop job-kill-task”或者“bin/hadoop job-fail-task”杀死一个任务或者使一个任务失败。
❑启用推测执行机制后,同一份数据可能同时由两个Task Attempt处理。当其中一个Task Attempt执行成功后,另外一个处理相同数据的Task Attempt将被杀掉。
❑某个作业运行失败,它的所有任务将被杀掉。
❑TaskTracker在一定时间内未汇报心跳,则JobTracker认为其死掉,它上面的所有Task均被标注为死亡。
(4)KillJobAction
该类封装了TaskTracker待清理的作业。TaskTracker接收到该命令后,会清理作业的临时目录。导致JobTracker向TaskTracker发送该命令的原因有很多,主要包括以下几个场景:
❑用户使用命令“bin/hadoop job-kill”或者“bin/hadoop job-fail”杀死一个作业或者使一个作业失败。
❑作业运行完成,通知TaskTracker清理该作业的工作目录。
❑作业运行失败,即同一个作业失败的Task数目超过一定比例。
(5)CommitTaskAction
该类封装了TaskTracker需提交的任务。为了防止同一个TaskInProgress的两个同时运行的Task Attempt(比如打开推测执行功能,一个任务可能存在备份任务)同时打开一个文件或者往一个文件中写数据而产生冲突,Hadoop让每个Task Attempt写到单独一个文件(以TaskAttemptID命名,比如attempt201208071706_0008_r_000000_0)中。通常而言,Hadoop让每个Task Attempt将计算结果写到临时目录${mapred.output.dir}/_temporary/${taskid}中,当某个Task Attempt成功运行完成后,再将运算结果转移到最终目录${mapred.output.dir}中。Hadoop将一个成功运行完成的Task Attempt结果文件从临时目录“提升”至最终目录的过程,称为“任务提交”。当TaskInProgress中一个任务被提交后,其他任务将被杀死,同时意味着该TaskInProgress运行完成。
2.调整心跳间隔
TaskTracker心跳时间间隔大小应该适度,如果太小,则JobTracker需要处理高并发的心跳连接请求,必然产生不小的并发压力;如果太大,空闲的资源不能及时汇报给JobTracker(进而为之分配新的Task),造成资源空闲,进而降低系统吞吐率。
TaskTracker汇报心跳的时间间隔并不是一成不变的,它会随着集群规模的动态调整(比如节点死掉或者用户动态添加新节点)而变化,以便能够合理利用JobTracker的并发处理能力。在Hadoop MapReduce中,只有JobTracker知道某一时刻集群的规模,因此由JobTracker为每个TaskTracker计算下一次汇报心跳的时间间隔,并通过心跳机制告诉TaskTracker。
JobTracker允许用户通过参数配置心跳的时间间隔加速比,即每增加mapred.heartbeats.in.second(默认是100,最小是1)个节点,心跳时间间隔增加mapreduce.jobtracker.heartbeats.scaling.factor(默认是1,最小是0.01)秒。同时,为了防止用户参数设置不合理而对JobTracker产生较大负载,JobTracker要求心跳时间间隔至少为3秒[1]。具体计算方法如下:
public int getNextHeartbeatInterval(){
//获取当前TaskTracker总数,即集群当前规模
int clusterSize=getClusterStatus().getTaskTrackers();
//计算新的心跳间隔
int heartbeatInterval=Math.max(
(int)(1000HEARTBEATS_SCALING_FACTOR
Math.ceil((double)clusterSize/
NUM_HEARTBEATS_IN_SECOND)),
HEARTBEAT_INTERVAL_MIN);
return heartbeatInterval;
}
[1]考虑在中小规模集群下,心跳时间间隔至少为3秒,这会降低系统吞吐率,在最新版本中,已经调整为300毫秒,具体参考:https://issues.apache.org/jira/browse/MAPREDUCE-1906。